/* Copyright 2019-2020 Michael Rowan, Yinjian Zhao * * This file is part of WarpX. * * License: BSD-3-Clause-LBNL */ #include "LoadBalanceCosts.H" #include "Diagnostics/ReducedDiags/ReducedDiags.H" #include "Particles/MultiParticleContainer.H" #include "Utils/TextMsg.H" #include "Utils/WarpXAlgorithmSelection.H" #include "WarpX.H" #include #include #include #include #include #include #include #include #include #ifdef AMREX_USE_MPI # include #endif #include #include #include #include #include #include #include using namespace amrex; namespace { amrex::Long countBoxMacroParticles (amrex::MFIter const & mfi, int const lev) { const int gid = mfi.index(); const int tid = mfi.LocalTileIndex(); auto box_index = std::make_pair(gid, tid); auto & warpx = WarpX::GetInstance(); MultiParticleContainer const & mpc = warpx.GetPartContainer(); int const nSpecies = mpc.nSpecies(); amrex::Long num_macro_particles = 0; for (int i_s = 0; i_s < nSpecies; ++i_s) { WarpXParticleContainer const & pc = mpc.GetParticleContainer(i_s); auto const & plev = pc.GetParticles(lev); auto const & ptile = plev.at(box_index); auto const & aos = ptile.GetArrayOfStructs(); auto const np = aos.numParticles(); num_macro_particles += np; } return num_macro_particles; } } // constructor LoadBalanceCosts::LoadBalanceCosts (std::string rd_name) : ReducedDiags{rd_name} { } // function that gathers costs void LoadBalanceCosts::ComputeDiags (int step) { // get a reference to WarpX instance auto& warpx = WarpX::GetInstance(); // judge if the diags should be done // costs is initialized only if we're doing load balance if (!m_intervals.contains(step+1) || !warpx.get_load_balance_intervals().isActivated() ) { return; } // get number of boxes over all levels auto nLevels = warpx.finestLevel() + 1; int nBoxes = 0; for (int lev = 0; lev < nLevels; ++lev) { const auto cost = WarpX::getCosts(lev); WARPX_ALWAYS_ASSERT_WITH_MESSAGE( cost, "ERROR: costs are not initialized on level " + std::to_string(lev) + " !"); nBoxes += cost->size(); } // keep track of the max number of boxes, this is needed later on to fill // the jagged array (in case each step does not have the same number of boxes) m_nBoxesMax = std::max(m_nBoxesMax, nBoxes); // resize and clear data array const size_t dataSize = static_cast(m_nDataFields)* static_cast(nBoxes); m_data.resize(dataSize, 0.0_rt); m_data.assign(dataSize, 0.0_rt); // read in WarpX costs to local copy; compute if using `Heuristic` update amrex::Vector > > costs; costs.resize(nLevels); for (int lev = 0; lev < nLevels; ++lev) { costs[lev] = std::make_unique>(*WarpX::getCosts(lev)); } if (WarpX::load_balance_costs_update_algo == LoadBalanceCostsUpdateAlgo::Heuristic) { warpx.ComputeCostsHeuristic(costs); } // keep track of correct index in array over all boxes on all levels // shift index for m_data int shift_m_data = 0; // save data for (int lev = 0; lev < nLevels; ++lev) { const amrex::DistributionMapping& dm = warpx.DistributionMap(lev); const MultiFab & Ex = warpx.getEfield(lev,0); for (MFIter mfi(Ex, false); mfi.isValid(); ++mfi) { const Box& tbx = mfi.tilebox(); m_data[shift_m_data + mfi.index()*m_nDataFields + 0] = (*costs[lev])[mfi.index()]; m_data[shift_m_data + mfi.index()*m_nDataFields + 1] = dm[mfi.index()]; m_data[shift_m_data + mfi.index()*m_nDataFields + 2] = lev; m_data[shift_m_data + mfi.index()*m_nDataFields + 3] = tbx.loVect()[0]; #if (AMREX_SPACEDIM >= 2) m_data[shift_m_data + mfi.index()*m_nDataFields + 4] = tbx.loVect()[1]; #else m_data[shift_m_data + mfi.index()*m_nDataFields + 4] = 0.; #endif #if defined(WARPX_DIM_3D) m_data[shift_m_data + mfi.index()*m_nDataFields + 5] = tbx.loVect()[2]; #else m_data[shift_m_data + mfi.index()*m_nDataFields + 5] = 0.; #endif m_data[shift_m_data + mfi.index()*m_nDataFields + 6] = tbx.d_numPts(); // note: difference to volume m_data[shift_m_data + mfi.index()*m_nDataFields + 7] = countBoxMacroParticles(mfi, lev); #ifdef AMREX_USE_GPU m_data[shift_m_data + mfi.index()*m_nDataFields + 8] = amrex::Gpu::Device::deviceId(); #endif // ... } // we looped through all the boxes on level lev, update the shift index shift_m_data += m_nDataFields*(costs[lev]->size()); } // parallel reduce to IO proc and get data over all procs ParallelDescriptor::ReduceRealSum(m_data.data(), m_data.size(), ParallelDescriptor::IOProcessorNumber()); #ifdef AMREX_USE_MPI // now parallel reduce to IO proc and get string data (host name) over all procs // MPI Gatherv preliminaries // get the MPI host name and number of characters char hostname[MPI_MAX_PROCESSOR_NAME]; int length; BL_MPI_REQUIRE( MPI_Get_processor_name( hostname, &length ) ); // IO proc will collect messages from other procs; // receive counts and displacements needed only by IO proc m_data_string_recvcount.resize(ParallelDescriptor::NProcs(), 0); m_data_string_recvcount.assign(ParallelDescriptor::NProcs(), 0); m_data_string_disp.resize(ParallelDescriptor::NProcs(), 0); m_data_string_disp.assign(ParallelDescriptor::NProcs(), 0); // get the string lengths on IO proc ParallelDescriptor::Gather(&length, 1, // send m_data_string_recvcount.data(), 1, // receive ParallelDescriptor::IOProcessorNumber()); // determine total length of collected strings for root, and set displacements; // + 1 is for chosen separation between words in the gathered string; this // chosen separator character is set further below when elements of // m_data_string_recvbuf are initialized m_data_string_recvbuf_length += m_data_string_recvcount[0] + 1; for (int i=1; ihostname ParallelDescriptor::Gatherv(&hostname[0], /* hostname ID */ length, /* length of hostname */ m_data_string_recvbuf.data(), /* write data into string buffer */ m_data_string_recvcount, /* how many messages to receive */ m_data_string_disp, /* starting position in recv buffer to place received msg */ ParallelDescriptor::IOProcessorNumber()); #endif // cleanup if (ParallelDescriptor::MyProc() == ParallelDescriptor::IOProcessorNumber()) { #ifdef AMREX_USE_MPI const std::string m_data_stdstring_recvbuf(m_data_string_recvbuf.begin(), m_data_string_recvbuf.end()); m_data_string = amrex::Tokenize(m_data_stdstring_recvbuf, " "); #endif } /* m_data now contains up-to-date values for: * [[cost, proc, lev, i_low, j_low, k_low, num_cells, num_macro_particles(, gpu_ID [if GPU run]) ] of box 0 at level 0, * [cost, proc, lev, i_low, j_low, k_low, num_cells, num_macro_particles(, gpu_ID [if GPU run]) ] of box 1 at level 0, * [cost, proc, lev, i_low, j_low, k_low, num_cells, num_macro_particles(, gpu_ID [if GPU run]) ] of box 2 at level 0, * ... * [cost, proc, lev, i_low, j_low, k_low num_cells, num_macro_particles(, gpu_ID [if GPU run]) ] of box 0 at level 1, * [cost, proc, lev, i_low, j_low, k_low, num_cells, num_macro_particles(, gpu_ID [if GPU run]) ] of box 1 at level 1, * [cost, proc, lev, i_low, j_low, k_low, num_cells, num_macro_particles(, gpu_ID [if GPU run]) ] of box 2 at level 1, * ...] * and m_data_string contains: * [hostname of box 0 at level 0, * hostname of box 1 at level 0, * hostname of box 2 at level 0, * ... * hostname of box 0 at level 1, * hostname of box 1 at level 1, * hostname of box 2 at level 1, * ...] */ } // write to file function for cost void LoadBalanceCosts::WriteToFile (int step) const { // open file std::ofstream ofs{m_path + m_rd_name + "." + m_extension, std::ofstream::out | std::ofstream::app}; // write step ofs << step+1 << m_sep; // set precision ofs << std::fixed << std::setprecision(14) << std::scientific; // write time ofs << WarpX::GetInstance().gett_new(0); // loop over data size and write for (int i = 0; i < static_cast(m_data.size()); ++i) { ofs << m_sep << m_data[i]; if ((i - m_nDataFields + 1)%m_nDataFields == 0) { // at the end of current group of m_nDatafields, output the string data (hostname) const int ind_rank = i - m_nDataFields + 2; // index for the rank corresponding to current box // m_data --> rank --> hostname ofs << m_sep << m_data_string[static_cast(m_data[ind_rank])]; } } // end loop over data size // end line ofs << std::endl; // close file ofs.close(); // get a reference to WarpX instance auto& warpx = WarpX::GetInstance(); if (!ParallelDescriptor::IOProcessor()) return; // final step is a special case, fill jagged array with NaN if (m_intervals.nextContains(step+1) > warpx.maxStep()) { // open tmp file to copy data const std::string fileTmpName = m_path + m_rd_name + ".tmp." + m_extension; std::ofstream ofstmp(fileTmpName, std::ofstream::out); // write header row // for each box on each level we saved 9(10) data fields: // [cost, proc, lev, i_low, j_low, k_low, num_cells, num_macro_particles(, gpu_ID_box), hostname] // nDataFieldsToWrite = below accounts for the Real data fields (m_nDataFields), then 1 string output to write const int nDataFieldsToWrite = m_nDataFields + 1; int c = 0; ofstmp << "#"; ofstmp << "[" << c++ << "]step()"; ofstmp << m_sep; ofstmp << "[" << c++ << "]time(s)"; for (int boxNumber=0; boxNumber