diff options
Diffstat (limited to 'Source')
-rw-r--r-- | Source/Evolve/WarpXEvolve.cpp | 27 | ||||
-rw-r--r-- | Source/WarpX.H | 5 | ||||
-rw-r--r-- | Source/WarpX.cpp | 48 | ||||
-rw-r--r-- | Source/ablastr/utils/CMakeLists.txt | 1 | ||||
-rw-r--r-- | Source/ablastr/utils/Make.package | 2 | ||||
-rw-r--r-- | Source/ablastr/utils/SignalHandling.H | 88 | ||||
-rw-r--r-- | Source/ablastr/utils/SignalHandling.cpp | 192 |
7 files changed, 361 insertions, 2 deletions
diff --git a/Source/Evolve/WarpXEvolve.cpp b/Source/Evolve/WarpXEvolve.cpp index 049806059..78c4713da 100644 --- a/Source/Evolve/WarpXEvolve.cpp +++ b/Source/Evolve/WarpXEvolve.cpp @@ -33,6 +33,8 @@ #include "Utils/WarpXProfilerWrapper.H" #include "Utils/WarpXUtil.H" +#include <ablastr/utils/SignalHandling.H> + #include <AMReX.H> #include <AMReX_Array.H> #include <AMReX_BLassert.H> @@ -53,6 +55,7 @@ #include <vector> using namespace amrex; +using ablastr::utils::SignalHandling; void WarpX::Evolve (int numsteps) @@ -79,6 +82,8 @@ WarpX::Evolve (int numsteps) WARPX_PROFILE("WarpX::Evolve::step"); Real evolve_time_beg_step = amrex::second(); + CheckSignals(); + multi_diags->NewIteration(); // Start loop on time steps @@ -342,6 +347,8 @@ WarpX::Evolve (int numsteps) Real evolve_time_end_step = amrex::second(); evolve_time += evolve_time_end_step - evolve_time_beg_step; + HandleSignals(); + if (verbose) { amrex::Print()<< "STEP " << step+1 << " ends." << " TIME = " << cur_time << " DT = " << dt[0] << "\n"; @@ -350,7 +357,7 @@ WarpX::Evolve (int numsteps) << " s; Avg. per step = " << evolve_time/(step-step_begin+1) << " s\n"; } - if (cur_time >= stop_time - 1.e-3*dt[0]) { + if (cur_time >= stop_time - 1.e-3*dt[0] || SignalHandling::TestAndResetActionRequestFlag(SignalHandling::SIGNAL_REQUESTS_BREAK)) { break; } @@ -949,3 +956,21 @@ WarpX::applyMirrors(Real time){ } } } + +void +WarpX::CheckSignals() +{ + SignalHandling::CheckSignals(); +} + +void +WarpX::HandleSignals() +{ + SignalHandling::WaitSignals(); + + // SIGNAL_REQUESTS_BREAK is handled directly in WarpX::Evolve + + if (SignalHandling::TestAndResetActionRequestFlag(SignalHandling::SIGNAL_REQUESTS_CHECKPOINT)) { + multi_diags->FilterComputePackFlushLastTimestep( istep[0] ); + } +} diff --git a/Source/WarpX.H b/Source/WarpX.H index f7060c9b3..5c1b19dd7 100644 --- a/Source/WarpX.H +++ b/Source/WarpX.H @@ -963,6 +963,11 @@ private: // Singleton is used when the code is run from python static WarpX* m_instance; + //! Check and clear signal flags and asynchronously broadcast them from process 0 + static void CheckSignals (); + //! Complete the asynchronous broadcast of signal flags, and initiate a checkpoint if requested + void HandleSignals (); + /// /// Advance the simulation by numsteps steps, electromagnetic case. /// diff --git a/Source/WarpX.cpp b/Source/WarpX.cpp index 3d11074df..e2f88e751 100644 --- a/Source/WarpX.cpp +++ b/Source/WarpX.cpp @@ -38,6 +38,8 @@ #include "Utils/WarpXProfilerWrapper.H" #include "Utils/WarpXUtil.H" +#include <ablastr/utils/SignalHandling.H> + #ifdef AMREX_USE_SENSEI_INSITU # include <AMReX_AmrMeshInSituBridge.H> #endif @@ -233,6 +235,8 @@ WarpX::WarpX () InitEB(); + ablastr::utils::SignalHandling::InitSignalHandling(); + // Geometry on all levels has been defined already. // No valid BoxArray and DistributionMapping have been defined. // But the arrays for them have been resized. @@ -560,6 +564,50 @@ WarpX::ReadParameters () } } + using ablastr::utils::SignalHandling; + std::vector<std::string> signals_in; + pp_warpx.queryarr("break_signals", signals_in); + +#if defined(__linux__) || defined(__APPLE__) + for (const std::string &str : signals_in) { + int sig = SignalHandling::parseSignalNameToNumber(str); + SignalHandling::signal_conf_requests[SignalHandling::SIGNAL_REQUESTS_BREAK][sig] = true; + } + signals_in.clear(); +#else + WARPX_ALWAYS_ASSERT_WITH_MESSAGE(signals_in.empty(), + "Signal handling requested in input, but is not supported on this platform"); +#endif + + bool have_checkpoint_diagnostic = false; + + ParmParse pp("diagnostics"); + std::vector<std::string> diags_names; + pp.queryarr("diags_names", diags_names); + + for (const auto &diag : diags_names) { + ParmParse dd(diag); + std::string format; + dd.query("format", format); + if (format == "checkpoint") { + have_checkpoint_diagnostic = true; + break; + } + } + + pp_warpx.queryarr("checkpoint_signals", signals_in); +#if defined(__linux__) || defined(__APPLE__) + for (const std::string &str : signals_in) { + int sig = SignalHandling::parseSignalNameToNumber(str); + SignalHandling::signal_conf_requests[SignalHandling::SIGNAL_REQUESTS_CHECKPOINT][sig] = true; + WARPX_ALWAYS_ASSERT_WITH_MESSAGE(have_checkpoint_diagnostic, + "Signal handling was requested to checkpoint, but no checkpoint diagnostic is configured"); + } +#else + WARPX_ALWAYS_ASSERT_WITH_MESSAGE(signals_in.empty(), + "Signal handling requested in input, but is not supported on this platform"); +#endif + // set random seed std::string random_seed = "default"; pp_warpx.query("random_seed", random_seed); diff --git a/Source/ablastr/utils/CMakeLists.txt b/Source/ablastr/utils/CMakeLists.txt index b452dc45c..7a174a174 100644 --- a/Source/ablastr/utils/CMakeLists.txt +++ b/Source/ablastr/utils/CMakeLists.txt @@ -1,4 +1,5 @@ target_sources(ablastr PRIVATE TextMsg.cpp + SignalHandling.cpp ) diff --git a/Source/ablastr/utils/Make.package b/Source/ablastr/utils/Make.package index 8fbee2098..dd343d9c1 100644 --- a/Source/ablastr/utils/Make.package +++ b/Source/ablastr/utils/Make.package @@ -1,3 +1,3 @@ -CEXE_sources += TextMsg.cpp +CEXE_sources += TextMsg.cpp SignalHandling.cpp VPATH_LOCATIONS += $(WARPX_HOME)/Source/ablastr/utils diff --git a/Source/ablastr/utils/SignalHandling.H b/Source/ablastr/utils/SignalHandling.H new file mode 100644 index 000000000..b633c3860 --- /dev/null +++ b/Source/ablastr/utils/SignalHandling.H @@ -0,0 +1,88 @@ +/* Copyright 2022 Philip Miller + * + * This file is part of WarpX. + * + * License: BSD-3-Clause-LBNL + */ + +#ifndef ABLASTR_SIGNAL_HANDLING_H_ +#define ABLASTR_SIGNAL_HANDLING_H_ + +#include <AMReX_Config.H> + +#if defined(AMREX_USE_MPI) +# include <mpi.h> +#endif + +#include <atomic> +#include <string> + +namespace ablastr::utils { + +/** + * \brief + * Signal handling + * + * Rank 0 will accept signals and asynchronously broadcast the + * configured response; other processes will ignore them and + * follow the lead of rank 0 to avoid potential for deadlocks or + * timestep-skewed response. + * + * Variables and functions are static rather than per-instance + * because signal handlers are configured at the process level. + */ +class SignalHandling +{ +public: + //! The range of signal values to accept + static constexpr int NUM_SIGNALS = 32; + + //! Labels for indexed positions in signal_actions_requests + enum signal_action_requested_labels { + //! Cleanly stop execution, as if the simulation reached its configured end + SIGNAL_REQUESTS_BREAK = 0, + //! Produce a checkpoint + SIGNAL_REQUESTS_CHECKPOINT = 1, + SIGNAL_REQUESTS_SIZE = 2 // This should always be 1 greater than the last valid value + }; + + //! Whether configuration requests the code take a particular action on a particular signal + static bool signal_conf_requests[SIGNAL_REQUESTS_SIZE][NUM_SIGNALS]; + + //! Take a string and convert it to a corresponding signal number if possible + static int parseSignalNameToNumber (const std::string &str); + + //! Set up signal handlers based on input configuration provided in `signal_conf_requests_*` + static void InitSignalHandling (); + + //! Check and clear signal flags and asynchronously broadcast them from process 0 + static void CheckSignals (); + //! Complete the asynchronous broadcast of signal flags + static void WaitSignals (); + + //! Check whether a given action has been requested, and reset the associated flag + static bool TestAndResetActionRequestFlag (int action_to_test); + +private: + //! On process 0, whether a given signal has been received since the last check + static std::atomic<bool> signal_received_flags[NUM_SIGNALS]; + +#if defined(AMREX_USE_MPI) + //! MPI requests for the asynchronous broadcasts of the signal-requested actions + static MPI_Request signal_mpi_ibcast_request; +#endif + + //! Signal handler to set flags on process 0 (other processes ignore configured signals) + static void SignalSetFlag (int signal_number); + + //! Boolean flags transmitted between CheckSignals() and + //! HandleSignals() to indicate actions requested by signals + static bool signal_actions_requested[SIGNAL_REQUESTS_SIZE]; + + // Don't allow clients to incorrectly try to construct and use an instance of this type + SignalHandling() = delete; +}; + +} // namespace ablastr::utils + +#endif // ABLASTR_SIGNAL_HANDLING_H_ diff --git a/Source/ablastr/utils/SignalHandling.cpp b/Source/ablastr/utils/SignalHandling.cpp new file mode 100644 index 000000000..cdec9b653 --- /dev/null +++ b/Source/ablastr/utils/SignalHandling.cpp @@ -0,0 +1,192 @@ +/* Copyright 2022 Philip Miller + * + * This file is part of WarpX. + * + * License: BSD-3-Clause-LBNL + */ + +#include "SignalHandling.H" +#include "TextMsg.H" + +#include <AMReX.H> +#include <AMReX_ParallelDescriptor.H> +#include <AMReX_IParser.H> + +#include <cctype> + +// For sigaction() et al. +#if defined(__linux__) || defined(__APPLE__) +# include <signal.h> +#endif + +namespace ablastr::utils { + +std::atomic<bool> SignalHandling::signal_received_flags[NUM_SIGNALS]; +bool SignalHandling::signal_conf_requests[SIGNAL_REQUESTS_SIZE][NUM_SIGNALS]; +bool SignalHandling::signal_actions_requested[SIGNAL_REQUESTS_SIZE]; +#if defined(AMREX_USE_MPI) +MPI_Request SignalHandling::signal_mpi_ibcast_request; +#endif + +int +SignalHandling::parseSignalNameToNumber(const std::string &str) +{ + amrex::IParser signals_parser(str); + +#if defined(__linux__) || defined(__APPLE__) + struct { + const char* abbrev; + const int value; + } signals_to_parse[] = { + {"ABRT", SIGABRT}, + {"ALRM", SIGALRM}, + {"BUS", SIGBUS}, + {"CHLD", SIGCHLD}, + {"CLD", SIGCHLD}, // Synonymous to SIGCHLD on Linux + {"CONT", SIGCONT}, +#if defined(SIGEMT) + {"EMT", SIGEMT}, // macOS and some Linux architectures +#endif + // Omitted because AMReX typically handles SIGFPE specially + // {"FPE", SIGFPE}, + {"HUP", SIGHUP}, + {"ILL", SIGILL}, +#if defined(SIGINFO) + {"INFO", SIGINFO}, // macOS and some Linux architectures +#endif + {"INT", SIGINT}, + {"IO", SIGIO}, + {"IOT", SIGABRT}, // Synonymous to SIGABRT on Linux + // {"KILL", SIGKILL}, // Cannot be handled + {"PIPE", SIGPIPE}, + {"POLL", SIGIO}, // Synonymous to SIGIO on Linux + {"PROF", SIGPROF}, +#if defined(SIGPWR) + {"PWR", SIGPWR}, // Linux-only +#endif + {"QUIT", SIGQUIT}, + {"SEGV", SIGSEGV}, +#if defined(SIGSTKFLT) + {"STKFLT", SIGSTKFLT}, // Linux-only +#endif + // {"STOP", SIGSTOP}, // Cannot be handled + {"SYS", SIGSYS}, + {"TERM", SIGTERM}, + {"TRAP", SIGTRAP}, + {"TSTP", SIGTSTP}, + {"TTIN", SIGTTIN}, + {"TTOU", SIGTTOU}, + {"URG", SIGURG}, + {"USR1", SIGUSR1}, + {"USR2", SIGUSR2}, + {"VTALRM", SIGVTALRM}, + {"WINCH", SIGWINCH}, + {"XCPU", SIGXCPU}, + {"XFSZ", SIGXFSZ}, + }; + + for (const auto& sp : signals_to_parse) { + std::string name_upper = sp.abbrev; + std::string name_lower = name_upper; + for (char &c : name_lower) { + c = std::tolower(c); + } + + signals_parser.setConstant(name_upper, sp.value); + signals_parser.setConstant(name_lower, sp.value); + name_upper = "SIG" + name_upper; + name_lower = "sig" + name_lower; + signals_parser.setConstant(name_upper, sp.value); + signals_parser.setConstant(name_lower, sp.value); + } +#endif // #if defined(__linux__) || defined(__APPLE__) + + auto spf = signals_parser.compileHost<0>(); + + int sig = spf(); + ABLASTR_ALWAYS_ASSERT_WITH_MESSAGE(sig < NUM_SIGNALS, + "Parsed signal value is outside the supported range of [1, 31]"); + + return sig; +} + +void +SignalHandling::InitSignalHandling() +{ +#if defined(__linux__) || defined(__APPLE__) + struct sigaction sa; + sigemptyset(&sa.sa_mask); + for (int signal_number = 0; signal_number < NUM_SIGNALS; ++signal_number) { + signal_received_flags[signal_number] = false; + + bool signal_active = false; + for (int signal_request = 0; signal_request < SIGNAL_REQUESTS_SIZE; ++signal_request) { + signal_active |= signal_conf_requests[signal_request][signal_number]; + } + if (signal_active) { + if (amrex::ParallelDescriptor::MyProc() == 0) { + sa.sa_handler = &SignalHandling::SignalSetFlag; + } else { + sa.sa_handler = SIG_IGN; + } + int result = sigaction(signal_number, &sa, nullptr); + ABLASTR_ALWAYS_ASSERT_WITH_MESSAGE(result == 0, + "Failed to install signal handler for a configured signal"); + } + } +#endif +} + +void +SignalHandling::CheckSignals() +{ + // We assume that signals will definitely be delivered to rank 0, + // and may be delivered to other ranks as well. For coordination, + // we process them according to when they're received by rank 0. + if (amrex::ParallelDescriptor::MyProc() == 0) { + for (int signal_number = 0; signal_number < NUM_SIGNALS; ++signal_number) { + // Read into a local temporary to ensure the same value is + // used throughout. Atomically exchange it with false to + // unset the flag without risking loss of a signal - if a + // signal arrives after this, it will be handled the next + // time this function is called. + bool signal_received = signal_received_flags[signal_number].exchange(false); + + if (signal_received) { + for (int signal_request = 0; signal_request < SIGNAL_REQUESTS_SIZE; ++signal_request) { + signal_actions_requested[signal_request] |= signal_conf_requests[signal_request][signal_number]; + } + } + } + } + +#if defined(AMREX_USE_MPI) + auto comm = amrex::ParallelDescriptor::Communicator(); + BL_MPI_REQUIRE(MPI_Ibcast(signal_actions_requested, SIGNAL_REQUESTS_SIZE, + MPI_CXX_BOOL, 0, comm,&signal_mpi_ibcast_request)); +#endif +} + +void +SignalHandling::WaitSignals() +{ +#if defined(AMREX_USE_MPI) + BL_MPI_REQUIRE(MPI_Wait(&signal_mpi_ibcast_request, MPI_STATUS_IGNORE)); +#endif +} + +bool +SignalHandling::TestAndResetActionRequestFlag(int action_to_test) +{ + bool retval = signal_actions_requested[action_to_test]; + signal_actions_requested[action_to_test] = false; + return retval; +} + +void +SignalHandling::SignalSetFlag(int signal_number) +{ + signal_received_flags[signal_number] = true; +} + +} // namespace ablastr::utils |