aboutsummaryrefslogtreecommitdiff
path: root/Regression/PostProcessingUtils/post_processing_utils.py
blob: 1ce398121155983d83015d55e7e01282748dbac3 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# Copyright 2021-2021 Neil Zaim
#
# This file is part of WarpX.
#
# License: BSD-3-Clause-LBNL

## This file contains functions that are used in multiple CI analysis scripts.

import numpy as np
import yt

## This is a generic function to test a particle filter. We reproduce the filter in python and
## verify that the results are the same as with the WarpX filtered diagnostic.
def check_particle_filter(fn, filtered_fn, filter_expression, dim, species_name):
    ds  = yt.load( fn )
    ds_filtered  = yt.load( filtered_fn )
    ad  = ds.all_data()
    ad_filtered  = ds_filtered.all_data()

    ## Load arrays from the unfiltered diagnostic
    ids = ad[species_name, 'particle_id'].to_ndarray()
    cpus = ad[species_name, 'particle_cpu'].to_ndarray()
    px  = ad[species_name, 'particle_momentum_x'].to_ndarray()
    pz  = ad[species_name, 'particle_momentum_z'].to_ndarray()
    py  = ad[species_name, 'particle_momentum_y'].to_ndarray()
    w  = ad[species_name, 'particle_weight'].to_ndarray()
    if (dim == "2d"):
        x = ad[species_name, 'particle_position_x'].to_ndarray()
        z = ad[species_name, 'particle_position_y'].to_ndarray()
    elif (dim == "3d"):
        x = ad[species_name, 'particle_position_x'].to_ndarray()
        y = ad[species_name, 'particle_position_y'].to_ndarray()
        z = ad[species_name, 'particle_position_z'].to_ndarray()
    elif (dim == "rz"):
        r = ad[species_name, 'particle_position_x'].to_ndarray()
        z = ad[species_name, 'particle_position_y'].to_ndarray()
        theta = ad[species_name, 'particle_theta'].to_ndarray()

    ## Load arrays from the filtered diagnostic
    ids_filtered_warpx = ad_filtered[species_name, 'particle_id'].to_ndarray()
    cpus_filtered_warpx = ad_filtered[species_name, 'particle_cpu'].to_ndarray()
    px_filtered_warpx  = ad_filtered[species_name, 'particle_momentum_x'].to_ndarray()
    pz_filtered_warpx  = ad_filtered[species_name, 'particle_momentum_z'].to_ndarray()
    py_filtered_warpx  = ad_filtered[species_name, 'particle_momentum_y'].to_ndarray()
    w_filtered_warpx  = ad_filtered[species_name, 'particle_weight'].to_ndarray()
    if (dim == "2d"):
        x_filtered_warpx = ad_filtered[species_name, 'particle_position_x'].to_ndarray()
        z_filtered_warpx = ad_filtered[species_name, 'particle_position_y'].to_ndarray()
    elif (dim == "3d"):
        x_filtered_warpx = ad_filtered[species_name, 'particle_position_x'].to_ndarray()
        y_filtered_warpx = ad_filtered[species_name, 'particle_position_y'].to_ndarray()
        z_filtered_warpx = ad_filtered[species_name, 'particle_position_z'].to_ndarray()
    elif (dim == "rz"):
        r_filtered_warpx = ad_filtered[species_name, 'particle_position_x'].to_ndarray()
        z_filtered_warpx = ad_filtered[species_name, 'particle_position_y'].to_ndarray()
        theta_filtered_warpx = ad_filtered[species_name, 'particle_theta'].to_ndarray()

    ## Reproduce the filter in python: this returns the indices of the filtered particles in the
    ## unfiltered arrays.
    ind_filtered_python, = np.where(eval(filter_expression))

    ## Sort the indices of the filtered arrays by particle id.
    sorted_ind_filtered_python = ind_filtered_python[np.argsort(ids[ind_filtered_python])]
    sorted_ind_filtered_warpx = np.argsort(ids_filtered_warpx)

    ## Check that the sorted ids are exactly the same with the warpx filter and the filter
    ## reproduced in python
    assert(np.array_equal(ids[sorted_ind_filtered_python],
                             ids_filtered_warpx[sorted_ind_filtered_warpx]))
    assert(np.array_equal(cpus[sorted_ind_filtered_python],
                             cpus_filtered_warpx[sorted_ind_filtered_warpx]))

    ## Finally, we check that the sum of the particles quantities are the same to machine precision
    tolerance_checksum = 1.e-12
    check_array_sum(px[sorted_ind_filtered_python],
                    px_filtered_warpx[sorted_ind_filtered_warpx], tolerance_checksum)
    check_array_sum(pz[sorted_ind_filtered_python],
                    pz_filtered_warpx[sorted_ind_filtered_warpx], tolerance_checksum)
    check_array_sum(py[sorted_ind_filtered_python],
                    py_filtered_warpx[sorted_ind_filtered_warpx], tolerance_checksum)
    check_array_sum(w[sorted_ind_filtered_python],
                    w_filtered_warpx[sorted_ind_filtered_warpx], tolerance_checksum)
    check_array_sum(z[sorted_ind_filtered_python],
                    z_filtered_warpx[sorted_ind_filtered_warpx], tolerance_checksum)
    if (dim == "2d"):
        check_array_sum(x[sorted_ind_filtered_python],
                        x_filtered_warpx[sorted_ind_filtered_warpx], tolerance_checksum)
    elif (dim == "3d"):
        check_array_sum(x[sorted_ind_filtered_python],
                        x_filtered_warpx[sorted_ind_filtered_warpx], tolerance_checksum)
        check_array_sum(y[sorted_ind_filtered_python],
                        y_filtered_warpx[sorted_ind_filtered_warpx], tolerance_checksum)
    elif (dim == "rz"):
        check_array_sum(r[sorted_ind_filtered_python],
                        r_filtered_warpx[sorted_ind_filtered_warpx], tolerance_checksum)
        check_array_sum(theta[sorted_ind_filtered_python],
                        theta_filtered_warpx[sorted_ind_filtered_warpx], tolerance_checksum)

## This function checks that the absolute sums of two arrays are the same to a required precision
def check_array_sum(array1, array2, tolerance_checksum):
    sum1 = np.sum(np.abs(array1))
    sum2 = np.sum(np.abs(array2))
    assert(abs(sum2-sum1)/sum1 < tolerance_checksum)

## This function is specifically used to test the random filter. First, we check that the number of
## dumped particles is as expected. Next, we call the generic check_particle_filter function.
def check_random_filter(fn, filtered_fn, random_fraction, dim, species_name):
    ds  = yt.load( fn )
    ds_filtered  = yt.load( filtered_fn )
    ad  = ds.all_data()
    ad_filtered  = ds_filtered.all_data()

    ## Check that the number of particles is as expected
    numparts = ad[species_name, 'particle_id'].to_ndarray().shape[0]
    numparts_filtered = ad_filtered['particle_id'].to_ndarray().shape[0]
    expected_numparts_filtered = random_fraction*numparts
    # 5 sigma test that has an intrinsic probability to fail of 1 over ~2 millions
    std_numparts_filtered = np.sqrt(expected_numparts_filtered)
    error = abs(numparts_filtered-expected_numparts_filtered)
    print("Random filter: difference between expected and actual number of dumped particles: " \
          + str(error))
    print("tolerance: " + str(5*std_numparts_filtered))
    assert(error<5*std_numparts_filtered)

    ## Dirty trick to find particles with the same ID + same CPU (does not work with more than 10
    ## MPI ranks)
    random_filter_expression = 'np.isin(ids + 0.1*cpus,' \
                                          'ids_filtered_warpx + 0.1*cpus_filtered_warpx)'
    check_particle_filter(fn, filtered_fn, random_filter_expression, dim, species_name)