1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
|
# Copyright 2021-2021 Neil Zaim
#
# This file is part of WarpX.
#
# License: BSD-3-Clause-LBNL
## This file contains functions that are used in multiple CI analysis scripts.
import numpy as np
import yt
## This is a generic function to test a particle filter. We reproduce the filter in python and
## verify that the results are the same as with the WarpX filtered diagnostic.
def check_particle_filter(fn, filtered_fn, filter_expression, dim, species_name):
ds = yt.load( fn )
ds_filtered = yt.load( filtered_fn )
ad = ds.all_data()
ad_filtered = ds_filtered.all_data()
## Load arrays from the unfiltered diagnostic
ids = ad[species_name, 'particle_id'].to_ndarray()
cpus = ad[species_name, 'particle_cpu'].to_ndarray()
px = ad[species_name, 'particle_momentum_x'].to_ndarray()
pz = ad[species_name, 'particle_momentum_z'].to_ndarray()
py = ad[species_name, 'particle_momentum_y'].to_ndarray()
w = ad[species_name, 'particle_weight'].to_ndarray()
if (dim == "2d"):
x = ad[species_name, 'particle_position_x'].to_ndarray()
z = ad[species_name, 'particle_position_y'].to_ndarray()
elif (dim == "3d"):
x = ad[species_name, 'particle_position_x'].to_ndarray()
y = ad[species_name, 'particle_position_y'].to_ndarray()
z = ad[species_name, 'particle_position_z'].to_ndarray()
elif (dim == "rz"):
r = ad[species_name, 'particle_position_x'].to_ndarray()
z = ad[species_name, 'particle_position_y'].to_ndarray()
theta = ad[species_name, 'particle_theta'].to_ndarray()
## Load arrays from the filtered diagnostic
ids_filtered_warpx = ad_filtered[species_name, 'particle_id'].to_ndarray()
cpus_filtered_warpx = ad_filtered[species_name, 'particle_cpu'].to_ndarray()
px_filtered_warpx = ad_filtered[species_name, 'particle_momentum_x'].to_ndarray()
pz_filtered_warpx = ad_filtered[species_name, 'particle_momentum_z'].to_ndarray()
py_filtered_warpx = ad_filtered[species_name, 'particle_momentum_y'].to_ndarray()
w_filtered_warpx = ad_filtered[species_name, 'particle_weight'].to_ndarray()
if (dim == "2d"):
x_filtered_warpx = ad_filtered[species_name, 'particle_position_x'].to_ndarray()
z_filtered_warpx = ad_filtered[species_name, 'particle_position_y'].to_ndarray()
elif (dim == "3d"):
x_filtered_warpx = ad_filtered[species_name, 'particle_position_x'].to_ndarray()
y_filtered_warpx = ad_filtered[species_name, 'particle_position_y'].to_ndarray()
z_filtered_warpx = ad_filtered[species_name, 'particle_position_z'].to_ndarray()
elif (dim == "rz"):
r_filtered_warpx = ad_filtered[species_name, 'particle_position_x'].to_ndarray()
z_filtered_warpx = ad_filtered[species_name, 'particle_position_y'].to_ndarray()
theta_filtered_warpx = ad_filtered[species_name, 'particle_theta'].to_ndarray()
## Reproduce the filter in python: this returns the indices of the filtered particles in the
## unfiltered arrays.
ind_filtered_python, = np.where(eval(filter_expression))
## Sort the indices of the filtered arrays by particle id.
sorted_ind_filtered_python = ind_filtered_python[np.argsort(ids[ind_filtered_python])]
sorted_ind_filtered_warpx = np.argsort(ids_filtered_warpx)
## Check that the sorted ids are exactly the same with the warpx filter and the filter
## reproduced in python
assert(np.array_equal(ids[sorted_ind_filtered_python],
ids_filtered_warpx[sorted_ind_filtered_warpx]))
assert(np.array_equal(cpus[sorted_ind_filtered_python],
cpus_filtered_warpx[sorted_ind_filtered_warpx]))
## Finally, we check that the sum of the particles quantities are the same to machine precision
tolerance_checksum = 1.e-12
check_array_sum(px[sorted_ind_filtered_python],
px_filtered_warpx[sorted_ind_filtered_warpx], tolerance_checksum)
check_array_sum(pz[sorted_ind_filtered_python],
pz_filtered_warpx[sorted_ind_filtered_warpx], tolerance_checksum)
check_array_sum(py[sorted_ind_filtered_python],
py_filtered_warpx[sorted_ind_filtered_warpx], tolerance_checksum)
check_array_sum(w[sorted_ind_filtered_python],
w_filtered_warpx[sorted_ind_filtered_warpx], tolerance_checksum)
check_array_sum(z[sorted_ind_filtered_python],
z_filtered_warpx[sorted_ind_filtered_warpx], tolerance_checksum)
if (dim == "2d"):
check_array_sum(x[sorted_ind_filtered_python],
x_filtered_warpx[sorted_ind_filtered_warpx], tolerance_checksum)
elif (dim == "3d"):
check_array_sum(x[sorted_ind_filtered_python],
x_filtered_warpx[sorted_ind_filtered_warpx], tolerance_checksum)
check_array_sum(y[sorted_ind_filtered_python],
y_filtered_warpx[sorted_ind_filtered_warpx], tolerance_checksum)
elif (dim == "rz"):
check_array_sum(r[sorted_ind_filtered_python],
r_filtered_warpx[sorted_ind_filtered_warpx], tolerance_checksum)
check_array_sum(theta[sorted_ind_filtered_python],
theta_filtered_warpx[sorted_ind_filtered_warpx], tolerance_checksum)
## This function checks that the absolute sums of two arrays are the same to a required precision
def check_array_sum(array1, array2, tolerance_checksum):
sum1 = np.sum(np.abs(array1))
sum2 = np.sum(np.abs(array2))
assert(abs(sum2-sum1)/sum1 < tolerance_checksum)
## This function is specifically used to test the random filter. First, we check that the number of
## dumped particles is as expected. Next, we call the generic check_particle_filter function.
def check_random_filter(fn, filtered_fn, random_fraction, dim, species_name):
ds = yt.load( fn )
ds_filtered = yt.load( filtered_fn )
ad = ds.all_data()
ad_filtered = ds_filtered.all_data()
## Check that the number of particles is as expected
numparts = ad[species_name, 'particle_id'].to_ndarray().shape[0]
numparts_filtered = ad_filtered['particle_id'].to_ndarray().shape[0]
expected_numparts_filtered = random_fraction*numparts
# 5 sigma test that has an intrinsic probability to fail of 1 over ~2 millions
std_numparts_filtered = np.sqrt(expected_numparts_filtered)
error = abs(numparts_filtered-expected_numparts_filtered)
print("Random filter: difference between expected and actual number of dumped particles: " \
+ str(error))
print("tolerance: " + str(5*std_numparts_filtered))
assert(error<5*std_numparts_filtered)
## Dirty trick to find particles with the same ID + same CPU (does not work with more than 10
## MPI ranks)
random_filter_expression = 'np.isin(ids + 0.1*cpus,' \
'ids_filtered_warpx + 0.1*cpus_filtered_warpx)'
check_particle_filter(fn, filtered_fn, random_filter_expression, dim, species_name)
|