diff options
Diffstat (limited to 'Regression/PostProcessingUtils/post_processing_utils.py')
-rw-r--r-- | Regression/PostProcessingUtils/post_processing_utils.py | 129 |
1 files changed, 129 insertions, 0 deletions
diff --git a/Regression/PostProcessingUtils/post_processing_utils.py b/Regression/PostProcessingUtils/post_processing_utils.py new file mode 100644 index 000000000..1ce398121 --- /dev/null +++ b/Regression/PostProcessingUtils/post_processing_utils.py @@ -0,0 +1,129 @@ +# Copyright 2021-2021 Neil Zaim +# +# This file is part of WarpX. +# +# License: BSD-3-Clause-LBNL + +## This file contains functions that are used in multiple CI analysis scripts. + +import numpy as np +import yt + +## This is a generic function to test a particle filter. We reproduce the filter in python and +## verify that the results are the same as with the WarpX filtered diagnostic. +def check_particle_filter(fn, filtered_fn, filter_expression, dim, species_name): + ds = yt.load( fn ) + ds_filtered = yt.load( filtered_fn ) + ad = ds.all_data() + ad_filtered = ds_filtered.all_data() + + ## Load arrays from the unfiltered diagnostic + ids = ad[species_name, 'particle_id'].to_ndarray() + cpus = ad[species_name, 'particle_cpu'].to_ndarray() + px = ad[species_name, 'particle_momentum_x'].to_ndarray() + pz = ad[species_name, 'particle_momentum_z'].to_ndarray() + py = ad[species_name, 'particle_momentum_y'].to_ndarray() + w = ad[species_name, 'particle_weight'].to_ndarray() + if (dim == "2d"): + x = ad[species_name, 'particle_position_x'].to_ndarray() + z = ad[species_name, 'particle_position_y'].to_ndarray() + elif (dim == "3d"): + x = ad[species_name, 'particle_position_x'].to_ndarray() + y = ad[species_name, 'particle_position_y'].to_ndarray() + z = ad[species_name, 'particle_position_z'].to_ndarray() + elif (dim == "rz"): + r = ad[species_name, 'particle_position_x'].to_ndarray() + z = ad[species_name, 'particle_position_y'].to_ndarray() + theta = ad[species_name, 'particle_theta'].to_ndarray() + + ## Load arrays from the filtered diagnostic + ids_filtered_warpx = ad_filtered[species_name, 'particle_id'].to_ndarray() + cpus_filtered_warpx = ad_filtered[species_name, 'particle_cpu'].to_ndarray() + px_filtered_warpx = ad_filtered[species_name, 'particle_momentum_x'].to_ndarray() + pz_filtered_warpx = ad_filtered[species_name, 'particle_momentum_z'].to_ndarray() + py_filtered_warpx = ad_filtered[species_name, 'particle_momentum_y'].to_ndarray() + w_filtered_warpx = ad_filtered[species_name, 'particle_weight'].to_ndarray() + if (dim == "2d"): + x_filtered_warpx = ad_filtered[species_name, 'particle_position_x'].to_ndarray() + z_filtered_warpx = ad_filtered[species_name, 'particle_position_y'].to_ndarray() + elif (dim == "3d"): + x_filtered_warpx = ad_filtered[species_name, 'particle_position_x'].to_ndarray() + y_filtered_warpx = ad_filtered[species_name, 'particle_position_y'].to_ndarray() + z_filtered_warpx = ad_filtered[species_name, 'particle_position_z'].to_ndarray() + elif (dim == "rz"): + r_filtered_warpx = ad_filtered[species_name, 'particle_position_x'].to_ndarray() + z_filtered_warpx = ad_filtered[species_name, 'particle_position_y'].to_ndarray() + theta_filtered_warpx = ad_filtered[species_name, 'particle_theta'].to_ndarray() + + ## Reproduce the filter in python: this returns the indices of the filtered particles in the + ## unfiltered arrays. + ind_filtered_python, = np.where(eval(filter_expression)) + + ## Sort the indices of the filtered arrays by particle id. + sorted_ind_filtered_python = ind_filtered_python[np.argsort(ids[ind_filtered_python])] + sorted_ind_filtered_warpx = np.argsort(ids_filtered_warpx) + + ## Check that the sorted ids are exactly the same with the warpx filter and the filter + ## reproduced in python + assert(np.array_equal(ids[sorted_ind_filtered_python], + ids_filtered_warpx[sorted_ind_filtered_warpx])) + assert(np.array_equal(cpus[sorted_ind_filtered_python], + cpus_filtered_warpx[sorted_ind_filtered_warpx])) + + ## Finally, we check that the sum of the particles quantities are the same to machine precision + tolerance_checksum = 1.e-12 + check_array_sum(px[sorted_ind_filtered_python], + px_filtered_warpx[sorted_ind_filtered_warpx], tolerance_checksum) + check_array_sum(pz[sorted_ind_filtered_python], + pz_filtered_warpx[sorted_ind_filtered_warpx], tolerance_checksum) + check_array_sum(py[sorted_ind_filtered_python], + py_filtered_warpx[sorted_ind_filtered_warpx], tolerance_checksum) + check_array_sum(w[sorted_ind_filtered_python], + w_filtered_warpx[sorted_ind_filtered_warpx], tolerance_checksum) + check_array_sum(z[sorted_ind_filtered_python], + z_filtered_warpx[sorted_ind_filtered_warpx], tolerance_checksum) + if (dim == "2d"): + check_array_sum(x[sorted_ind_filtered_python], + x_filtered_warpx[sorted_ind_filtered_warpx], tolerance_checksum) + elif (dim == "3d"): + check_array_sum(x[sorted_ind_filtered_python], + x_filtered_warpx[sorted_ind_filtered_warpx], tolerance_checksum) + check_array_sum(y[sorted_ind_filtered_python], + y_filtered_warpx[sorted_ind_filtered_warpx], tolerance_checksum) + elif (dim == "rz"): + check_array_sum(r[sorted_ind_filtered_python], + r_filtered_warpx[sorted_ind_filtered_warpx], tolerance_checksum) + check_array_sum(theta[sorted_ind_filtered_python], + theta_filtered_warpx[sorted_ind_filtered_warpx], tolerance_checksum) + +## This function checks that the absolute sums of two arrays are the same to a required precision +def check_array_sum(array1, array2, tolerance_checksum): + sum1 = np.sum(np.abs(array1)) + sum2 = np.sum(np.abs(array2)) + assert(abs(sum2-sum1)/sum1 < tolerance_checksum) + +## This function is specifically used to test the random filter. First, we check that the number of +## dumped particles is as expected. Next, we call the generic check_particle_filter function. +def check_random_filter(fn, filtered_fn, random_fraction, dim, species_name): + ds = yt.load( fn ) + ds_filtered = yt.load( filtered_fn ) + ad = ds.all_data() + ad_filtered = ds_filtered.all_data() + + ## Check that the number of particles is as expected + numparts = ad[species_name, 'particle_id'].to_ndarray().shape[0] + numparts_filtered = ad_filtered['particle_id'].to_ndarray().shape[0] + expected_numparts_filtered = random_fraction*numparts + # 5 sigma test that has an intrinsic probability to fail of 1 over ~2 millions + std_numparts_filtered = np.sqrt(expected_numparts_filtered) + error = abs(numparts_filtered-expected_numparts_filtered) + print("Random filter: difference between expected and actual number of dumped particles: " \ + + str(error)) + print("tolerance: " + str(5*std_numparts_filtered)) + assert(error<5*std_numparts_filtered) + + ## Dirty trick to find particles with the same ID + same CPU (does not work with more than 10 + ## MPI ranks) + random_filter_expression = 'np.isin(ids + 0.1*cpus,' \ + 'ids_filtered_warpx + 0.1*cpus_filtered_warpx)' + check_particle_filter(fn, filtered_fn, random_filter_expression, dim, species_name) |