aboutsummaryrefslogtreecommitdiff
path: root/Regression/PostProcessingUtils/post_processing_utils.py
diff options
context:
space:
mode:
Diffstat (limited to 'Regression/PostProcessingUtils/post_processing_utils.py')
-rw-r--r--Regression/PostProcessingUtils/post_processing_utils.py129
1 files changed, 129 insertions, 0 deletions
diff --git a/Regression/PostProcessingUtils/post_processing_utils.py b/Regression/PostProcessingUtils/post_processing_utils.py
new file mode 100644
index 000000000..1ce398121
--- /dev/null
+++ b/Regression/PostProcessingUtils/post_processing_utils.py
@@ -0,0 +1,129 @@
+# Copyright 2021-2021 Neil Zaim
+#
+# This file is part of WarpX.
+#
+# License: BSD-3-Clause-LBNL
+
+## This file contains functions that are used in multiple CI analysis scripts.
+
+import numpy as np
+import yt
+
+## This is a generic function to test a particle filter. We reproduce the filter in python and
+## verify that the results are the same as with the WarpX filtered diagnostic.
+def check_particle_filter(fn, filtered_fn, filter_expression, dim, species_name):
+ ds = yt.load( fn )
+ ds_filtered = yt.load( filtered_fn )
+ ad = ds.all_data()
+ ad_filtered = ds_filtered.all_data()
+
+ ## Load arrays from the unfiltered diagnostic
+ ids = ad[species_name, 'particle_id'].to_ndarray()
+ cpus = ad[species_name, 'particle_cpu'].to_ndarray()
+ px = ad[species_name, 'particle_momentum_x'].to_ndarray()
+ pz = ad[species_name, 'particle_momentum_z'].to_ndarray()
+ py = ad[species_name, 'particle_momentum_y'].to_ndarray()
+ w = ad[species_name, 'particle_weight'].to_ndarray()
+ if (dim == "2d"):
+ x = ad[species_name, 'particle_position_x'].to_ndarray()
+ z = ad[species_name, 'particle_position_y'].to_ndarray()
+ elif (dim == "3d"):
+ x = ad[species_name, 'particle_position_x'].to_ndarray()
+ y = ad[species_name, 'particle_position_y'].to_ndarray()
+ z = ad[species_name, 'particle_position_z'].to_ndarray()
+ elif (dim == "rz"):
+ r = ad[species_name, 'particle_position_x'].to_ndarray()
+ z = ad[species_name, 'particle_position_y'].to_ndarray()
+ theta = ad[species_name, 'particle_theta'].to_ndarray()
+
+ ## Load arrays from the filtered diagnostic
+ ids_filtered_warpx = ad_filtered[species_name, 'particle_id'].to_ndarray()
+ cpus_filtered_warpx = ad_filtered[species_name, 'particle_cpu'].to_ndarray()
+ px_filtered_warpx = ad_filtered[species_name, 'particle_momentum_x'].to_ndarray()
+ pz_filtered_warpx = ad_filtered[species_name, 'particle_momentum_z'].to_ndarray()
+ py_filtered_warpx = ad_filtered[species_name, 'particle_momentum_y'].to_ndarray()
+ w_filtered_warpx = ad_filtered[species_name, 'particle_weight'].to_ndarray()
+ if (dim == "2d"):
+ x_filtered_warpx = ad_filtered[species_name, 'particle_position_x'].to_ndarray()
+ z_filtered_warpx = ad_filtered[species_name, 'particle_position_y'].to_ndarray()
+ elif (dim == "3d"):
+ x_filtered_warpx = ad_filtered[species_name, 'particle_position_x'].to_ndarray()
+ y_filtered_warpx = ad_filtered[species_name, 'particle_position_y'].to_ndarray()
+ z_filtered_warpx = ad_filtered[species_name, 'particle_position_z'].to_ndarray()
+ elif (dim == "rz"):
+ r_filtered_warpx = ad_filtered[species_name, 'particle_position_x'].to_ndarray()
+ z_filtered_warpx = ad_filtered[species_name, 'particle_position_y'].to_ndarray()
+ theta_filtered_warpx = ad_filtered[species_name, 'particle_theta'].to_ndarray()
+
+ ## Reproduce the filter in python: this returns the indices of the filtered particles in the
+ ## unfiltered arrays.
+ ind_filtered_python, = np.where(eval(filter_expression))
+
+ ## Sort the indices of the filtered arrays by particle id.
+ sorted_ind_filtered_python = ind_filtered_python[np.argsort(ids[ind_filtered_python])]
+ sorted_ind_filtered_warpx = np.argsort(ids_filtered_warpx)
+
+ ## Check that the sorted ids are exactly the same with the warpx filter and the filter
+ ## reproduced in python
+ assert(np.array_equal(ids[sorted_ind_filtered_python],
+ ids_filtered_warpx[sorted_ind_filtered_warpx]))
+ assert(np.array_equal(cpus[sorted_ind_filtered_python],
+ cpus_filtered_warpx[sorted_ind_filtered_warpx]))
+
+ ## Finally, we check that the sum of the particles quantities are the same to machine precision
+ tolerance_checksum = 1.e-12
+ check_array_sum(px[sorted_ind_filtered_python],
+ px_filtered_warpx[sorted_ind_filtered_warpx], tolerance_checksum)
+ check_array_sum(pz[sorted_ind_filtered_python],
+ pz_filtered_warpx[sorted_ind_filtered_warpx], tolerance_checksum)
+ check_array_sum(py[sorted_ind_filtered_python],
+ py_filtered_warpx[sorted_ind_filtered_warpx], tolerance_checksum)
+ check_array_sum(w[sorted_ind_filtered_python],
+ w_filtered_warpx[sorted_ind_filtered_warpx], tolerance_checksum)
+ check_array_sum(z[sorted_ind_filtered_python],
+ z_filtered_warpx[sorted_ind_filtered_warpx], tolerance_checksum)
+ if (dim == "2d"):
+ check_array_sum(x[sorted_ind_filtered_python],
+ x_filtered_warpx[sorted_ind_filtered_warpx], tolerance_checksum)
+ elif (dim == "3d"):
+ check_array_sum(x[sorted_ind_filtered_python],
+ x_filtered_warpx[sorted_ind_filtered_warpx], tolerance_checksum)
+ check_array_sum(y[sorted_ind_filtered_python],
+ y_filtered_warpx[sorted_ind_filtered_warpx], tolerance_checksum)
+ elif (dim == "rz"):
+ check_array_sum(r[sorted_ind_filtered_python],
+ r_filtered_warpx[sorted_ind_filtered_warpx], tolerance_checksum)
+ check_array_sum(theta[sorted_ind_filtered_python],
+ theta_filtered_warpx[sorted_ind_filtered_warpx], tolerance_checksum)
+
+## This function checks that the absolute sums of two arrays are the same to a required precision
+def check_array_sum(array1, array2, tolerance_checksum):
+ sum1 = np.sum(np.abs(array1))
+ sum2 = np.sum(np.abs(array2))
+ assert(abs(sum2-sum1)/sum1 < tolerance_checksum)
+
+## This function is specifically used to test the random filter. First, we check that the number of
+## dumped particles is as expected. Next, we call the generic check_particle_filter function.
+def check_random_filter(fn, filtered_fn, random_fraction, dim, species_name):
+ ds = yt.load( fn )
+ ds_filtered = yt.load( filtered_fn )
+ ad = ds.all_data()
+ ad_filtered = ds_filtered.all_data()
+
+ ## Check that the number of particles is as expected
+ numparts = ad[species_name, 'particle_id'].to_ndarray().shape[0]
+ numparts_filtered = ad_filtered['particle_id'].to_ndarray().shape[0]
+ expected_numparts_filtered = random_fraction*numparts
+ # 5 sigma test that has an intrinsic probability to fail of 1 over ~2 millions
+ std_numparts_filtered = np.sqrt(expected_numparts_filtered)
+ error = abs(numparts_filtered-expected_numparts_filtered)
+ print("Random filter: difference between expected and actual number of dumped particles: " \
+ + str(error))
+ print("tolerance: " + str(5*std_numparts_filtered))
+ assert(error<5*std_numparts_filtered)
+
+ ## Dirty trick to find particles with the same ID + same CPU (does not work with more than 10
+ ## MPI ranks)
+ random_filter_expression = 'np.isin(ids + 0.1*cpus,' \
+ 'ids_filtered_warpx + 0.1*cpus_filtered_warpx)'
+ check_particle_filter(fn, filtered_fn, random_filter_expression, dim, species_name)