diff --git a/.gitignore b/.gitignore index 59fa215f..9bcd34f5 100644 --- a/.gitignore +++ b/.gitignore @@ -12,6 +12,10 @@ **/buffer/* **/sed_config.yaml **/datasets.json +copy_yaml_to_json.ipynb + +# local copies +**/*.local.* # Byte-compiled / optimized / DLL files __pycache__/ @@ -145,3 +149,6 @@ dmypy.json # IDE stuff \.vscode + +# poetry local config +poetry.toml diff --git a/sed/binning/binning.py b/sed/binning/binning.py index 4f904275..8646159f 100644 --- a/sed/binning/binning.py +++ b/sed/binning/binning.py @@ -267,7 +267,7 @@ def bin_dataframe( Defaults to None. pbar (bool, optional): Option to show the tqdm progress bar. Defaults to True. n_cores (int, optional): Number of CPU cores to use for parallelization. - Defaults to all but one of the available cores. Defaults to N_CPU-1. + Defaults to all but one of the available cores. threads_per_worker (int, optional): Limit the number of threads that multiprocessing can spawn. Defaults to 4. threadpool_api (str, optional): The API to use for multiprocessing. diff --git a/sed/calibrator/delay.py b/sed/calibrator/delay.py index 78dd1e3b..666acbb7 100644 --- a/sed/calibrator/delay.py +++ b/sed/calibrator/delay.py @@ -144,9 +144,10 @@ def append_delay_axis( t0_key = self._config["delay"].get("t0_key", "") if "adc_range" not in calibration.keys(): - calibration["adc_range"] = np.asarray( - self._config["delay"]["adc_range"], - ) / 2 ** (self._config["dataframe"]["adc_binning"] - 1) + calibration["adc_range"] = ( + np.asarray(self._config["delay"]["adc_range"]) + / self._config["dataframe"]["adc_binning"] + ) if "delay_range" not in calibration.keys(): if "delay_range_mm" not in calibration.keys() or "time0" not in calibration.keys(): diff --git a/sed/calibrator/energy.py b/sed/calibrator/energy.py index b3cbefc8..3eefef60 100644 --- a/sed/calibrator/energy.py +++ b/sed/calibrator/energy.py @@ -105,10 +105,8 @@ def __init__( self.binning: int = self._config["dataframe"]["tof_binning"] self.x_width = self._config["energy"]["x_width"] self.y_width = self._config["energy"]["y_width"] - self.tof_width = np.asarray( - self._config["energy"]["tof_width"], - ) / 2 ** (self.binning - 1) - self.tof_fermi = self._config["energy"]["tof_fermi"] / 2 ** (self.binning - 1) + self.tof_width = np.asarray(self._config["energy"]["tof_width"]) / self.binning + self.tof_fermi = self._config["energy"]["tof_fermi"] / self.binning self.color_clip = self._config["energy"]["color_clip"] self.sector_delays = self._config["dataframe"].get("sector_delays", None) self.sector_id_column = self._config["dataframe"].get("sector_id_column", None) @@ -204,26 +202,21 @@ def bin_data( if bins is None: bins = [self._config["energy"]["bins"]] if ranges is None: - ranges_ = [ - np.array(self._config["energy"]["ranges"]) / 2 ** (self.binning - 1), - ] + ranges_ = [np.array(self._config["energy"]["ranges"]) / self.binning] ranges = [cast(tuple[float, float], tuple(v)) for v in ranges_] # pylint: disable=duplicate-code hist_mode = kwds.pop("hist_mode", self._config["binning"]["hist_mode"]) mode = kwds.pop("mode", self._config["binning"]["mode"]) pbar = kwds.pop("pbar", self._config["binning"]["pbar"]) try: - num_cores = kwds.pop("num_cores", self._config["binning"]["num_cores"]) + num_cores = kwds.pop("num_cores", self._config["core"]["num_cores"]) except KeyError: num_cores = psutil.cpu_count() - 1 threads_per_worker = kwds.pop( "threads_per_worker", self._config["binning"]["threads_per_worker"], ) - threadpool_api = kwds.pop( - "threadpool_API", - self._config["binning"]["threadpool_API"], - ) + threadpool_api = kwds.pop("threadpool_API", self._config["binning"]["threadpool_API"]) read_biases = False if biases is None: @@ -2171,10 +2164,7 @@ def residual(pars, time, data, binwidth, binning, energy_scale): name="t0", value=t0_pars.get("value", 1e-6), min=t0_pars.get("min", -np.inf), - max=t0_pars.get( - "max", - (min(pos) - 1) * binwidth * 2**binning, - ), + max=t0_pars.get("max", (min(pos) - 1) * binwidth * binning), vary=t0_pars.get("vary", True), ) E0_pars = kwds.pop("E0", {}) # pylint: disable=invalid-name @@ -2364,7 +2354,7 @@ def tof2ev( # m_e/2 [eV] bin width [s] energy = ( - 2.84281e-12 * sign * (tof_distance / (t * binwidth * 2**binning - time_offset)) ** 2 + 2.84281e-12 * sign * (tof_distance / (t * binwidth * binning - time_offset)) ** 2 + energy_offset ) @@ -2414,5 +2404,5 @@ def tof2ns( Returns: float: Converted time in nanoseconds. """ - val = t * 1e9 * binwidth * 2.0**binning + val = t * 1e9 * binwidth * binning return val diff --git a/sed/config/default.yaml b/sed/config/default.yaml index d0f779be..b047d8a8 100644 --- a/sed/config/default.yaml +++ b/sed/config/default.yaml @@ -31,9 +31,9 @@ dataframe: delay_column: "delay" # time length of a base time-of-flight bin in s tof_binwidth: 4.125e-12 - # Binning factor of the tof_column-data compared to tof_binwidth (2^(tof_binning-1)) + # Binning factor of the tof_column-data compared to tof_binwidth tof_binning: 1 - # binning factor used for the adc coordinate (2^(adc_binning-1)) + # binning factor used for the adc coordinate adc_binning: 1 # list of columns to apply jitter to. jitter_cols: ["@x_column", "@y_column", "@tof_column"] @@ -45,7 +45,7 @@ dataframe: energy: # Number of bins to use for energy calibration traces bins: 1000 - # Bin ranges to use for energy calibration curves (for tof_binning=0) + # Bin ranges to use for energy calibration curves (for tof_binning=1) ranges: [100000, 150000] # Option to normalize energy calibration traces normalize: True diff --git a/sed/config/flash_example_config.yaml b/sed/config/flash_example_config.yaml index 7f4cf51b..d9ce9184 100644 --- a/sed/config/flash_example_config.yaml +++ b/sed/config/flash_example_config.yaml @@ -3,6 +3,8 @@ core: # defines the loader loader: flash + # Since this will run on maxwell most probably, we have a lot of cores at our disposal + num_cores: 100 # the beamline where experiment took place beamline: pg2 # the ID number of the beamtime @@ -21,8 +23,16 @@ core: data_parquet_dir: "tests/data/loader/flash/parquet" binning: - # Since this will run on maxwell most probably, we have a lot of cores at our disposal - num_cores: 100 + # Histogram computation mode to use. + hist_mode: "numba" + # Mode for histogram recombination to use + mode: fast + # Whether to display a progress bar + pbar: True + # Number of multithreading threads per worker thread + threads_per_worker: 4 + # API for numpy multithreading + threadpool_API: "blas" dataframe: # The name of the DAQ system to use. Necessary to resolve the filenames/paths. @@ -58,8 +68,8 @@ dataframe: time_stamp_alias: timeStamp # time length of a base time-of-flight bin in seconds tof_binwidth: 2.0576131995767355E-11 - # binning parameter for time-of-flight data. 2**tof_binning bins per base bin - tof_binning: 3 # power of 2, 3 means 8 bins per step + # binning parameter for time-of-flight data. + tof_binning: 8 # dataframe column containing sector ID. obtained from dldTimeSteps column sector_id_column: dldSectorID sector_delays: [0., 0., 0., 0., 0., 0., 0., 0.] diff --git a/sed/config/mpes_example_config.yaml b/sed/config/mpes_example_config.yaml index 37a2b5ae..5aa99157 100644 --- a/sed/config/mpes_example_config.yaml +++ b/sed/config/mpes_example_config.yaml @@ -1,6 +1,8 @@ core: # The loader to use. The mpes loader allows for loading hdf5 files from the METIS momentum microscope. loader: mpes + # Number of parallel threads to use for parallelized jobs (e.g. binning, data conversion, copy, ...) + num_cores: 20 # Option to use the copy tool to mirror data to a local storage location before processing. use_copy_tool: False # path to the root of the source data directory @@ -9,8 +11,6 @@ core: copy_tool_dest: "/path/to/localDataStore/" # optional keywords for the copy tool: copy_tool_kwds: - # number of parallel copy jobs - ntasks: 20 # group id to set for copied files and folders gid: 1001 @@ -57,10 +57,10 @@ dataframe: delay_column: "delay" # time length of a base time-of-flight bin in ns tof_binwidth: 4.125e-12 - # Binning factor of the tof_column-data compared to tof_binwidth (2^(tof_binning-1)) - tof_binning: 2 - # binning factor used for the adc coordinate (2^(adc_binning-1)) - adc_binning: 3 + # Binning factor of the tof_column-data compared to tof_binwidth + tof_binning: 4 + # binning factor used for the adc coordinate + adc_binning: 4 # Default units for dataframe entries units: X: 'step' @@ -82,8 +82,8 @@ dataframe: energy: # Number of bins to use for energy calibration traces bins: 1000 - # Bin ranges to use for energy calibration curves (for tof_binning=0) - ranges: [128000, 138000] + # Bin ranges to use for energy calibration curves (for tof_binning=1) + ranges: [256000, 276000] # hdf5 path to attribute storing bias information for a given file bias_key: "@KTOF:Lens:Sample:V" # Option to normalize energy calibration traces @@ -102,7 +102,7 @@ energy: energy_scale: "kinetic" # Approximate position of the high-energy-cutoff in tof_column bins, # used for displaying a graph to choose the energy correction function parameters. - tof_fermi: 132250 + tof_fermi: 264500 # TOF range to visualize for the correction tool around tof_fermi tof_width: [-600, 1000] # x-integration range for the correction tool around the center pixel @@ -142,7 +142,7 @@ momentum: # Bin numbers used for the respective axes bins: [512, 512, 300] # bin ranges to use (in unbinned detector coordinates) - ranges: [[-256, 1792], [-256, 1792], [132000, 136000]] + ranges: [[-256, 1792], [-256, 1792], [264000, 272000]] # The x/y pixel ranges of the detector detector_ranges: [[0, 2048], [0, 2048]] # The center pixel of the detector in the binned x/y coordinates @@ -199,8 +199,6 @@ binning: mode: "fast" # Whether to display a progress bar pbar: True - # Number of parallel binning threads to use - num_cores: 20 # Number of multithreading threads per worker thread threads_per_worker: 4 # API for numpy multithreading @@ -213,7 +211,7 @@ histogram: # Axes names starting with "@" refer to keys in the "dataframe" section axes: ["@x_column", "@y_column", "@tof_column", "@adc_column"] # default ranges to use for histogram visualization (in unbinned detector coordinates) - ranges: [[0, 1800], [0, 1800], [128000, 138000], [0, 32000]] + ranges: [[0, 1800], [0, 1800], [256000, 276000], [0, 32000]] metadata: # URL of the epics archiver request engine diff --git a/tutorial/sxp_config.yaml b/sed/config/sxp_example_config.yaml similarity index 85% rename from tutorial/sxp_config.yaml rename to sed/config/sxp_example_config.yaml index 70e7d163..c0757fa5 100644 --- a/tutorial/sxp_config.yaml +++ b/sed/config/sxp_example_config.yaml @@ -1,5 +1,7 @@ core: loader: sxp + # Since this will run on maxwell most probably, we have a lot of cores at our disposal + num_cores: 100 beamtime_id: p005639 year: 202302 beamline: sxp @@ -10,7 +12,16 @@ core: data_parquet_dir: "/path/to/parquet" binning: - num_cores: 10 + # Histogram computation mode to use. + hist_mode: "numba" + # Mode for histogram recombination to use + mode: fast + # Whether to display a progress bar + pbar: True + # Number of multithreading threads per worker thread + threads_per_worker: 4 + # API for numpy multithreading + threadpool_API: "blas" dataframe: ubid_offset: 0 @@ -28,7 +39,7 @@ dataframe: corrected_tof_column: "tm" bias_column: "sampleBias" tof_binwidth: 6.875E-12 # in seconds - tof_binning: 0 + tof_binning: 1 jitter_cols: ["dldPosX", "dldPosY", "dldTimeSteps"] units: diff --git a/sed/core/processor.py b/sed/core/processor.py index b334c1a8..1c1398e2 100644 --- a/sed/core/processor.py +++ b/sed/core/processor.py @@ -102,10 +102,10 @@ def __init__( for key in config_kwds.keys(): del kwds[key] self._config = parse_config(config, **config_kwds) - num_cores = self._config.get("binning", {}).get("num_cores", N_CPU - 1) + num_cores = self._config["core"].get("num_cores", N_CPU - 1) if num_cores >= N_CPU: num_cores = N_CPU - 1 - self._config["binning"]["num_cores"] = num_cores + self._config["core"]["num_cores"] = num_cores if verbose is None: self.verbose = self._config["core"].get("verbose", False) @@ -154,6 +154,7 @@ def __init__( self.ct = CopyTool( source=self._config["core"]["copy_tool_source"], dest=self._config["core"]["copy_tool_dest"], + num_cores=self._config["core"]["num_cores"], **self._config["core"].get("copy_tool_kwds", {}), ) except KeyError: @@ -2105,9 +2106,7 @@ def pre_binning( bins = self._config["momentum"]["bins"] if ranges is None: ranges_ = list(self._config["momentum"]["ranges"]) - ranges_[2] = np.asarray(ranges_[2]) / 2 ** ( - self._config["dataframe"]["tof_binning"] - 1 - ) + ranges_[2] = np.asarray(ranges_[2]) / self._config["dataframe"]["tof_binning"] ranges = [cast(tuple[float, float], tuple(v)) for v in ranges_] assert self._dataframe is not None, "dataframe needs to be loaded first!" @@ -2162,7 +2161,7 @@ def compute( - **pbar**: Option to show the tqdm progress bar. Defaults to config["binning"]["pbar"]. - **n_cores**: Number of CPU cores to use for parallelization. - Defaults to config["binning"]["num_cores"] or N_CPU-1. + Defaults to config["core"]["num_cores"] or N_CPU-1. - **threads_per_worker**: Limit the number of threads that multiprocessing can spawn per binning thread. Defaults to config["binning"]["threads_per_worker"]. @@ -2189,7 +2188,7 @@ def compute( hist_mode = kwds.pop("hist_mode", self._config["binning"]["hist_mode"]) mode = kwds.pop("mode", self._config["binning"]["mode"]) pbar = kwds.pop("pbar", self._config["binning"]["pbar"]) - num_cores = kwds.pop("num_cores", self._config["binning"]["num_cores"]) + num_cores = kwds.pop("num_cores", self._config["core"]["num_cores"]) threads_per_worker = kwds.pop( "threads_per_worker", self._config["binning"]["threads_per_worker"], @@ -2407,13 +2406,9 @@ def view_event_histogram( ranges = list(self._config["histogram"]["ranges"]) for loc, axis in enumerate(axes): if axis == self._config["dataframe"]["tof_column"]: - ranges[loc] = np.asarray(ranges[loc]) / 2 ** ( - self._config["dataframe"]["tof_binning"] - 1 - ) + ranges[loc] = np.asarray(ranges[loc]) / self._config["dataframe"]["tof_binning"] elif axis == self._config["dataframe"]["adc_column"]: - ranges[loc] = np.asarray(ranges[loc]) / 2 ** ( - self._config["dataframe"]["adc_binning"] - 1 - ) + ranges[loc] = np.asarray(ranges[loc]) / self._config["dataframe"]["adc_binning"] input_types = map(type, [axes, bins, ranges]) allowed_types = [list, tuple] diff --git a/sed/loader/mirrorutil.py b/sed/loader/mirrorutil.py index d6db66e3..03d4a44b 100644 --- a/sed/loader/mirrorutil.py +++ b/sed/loader/mirrorutil.py @@ -13,6 +13,7 @@ from datetime import datetime import dask as d +import psutil from dask.diagnostics import ProgressBar @@ -36,11 +37,13 @@ def __init__( "safetyMargin", 1 * 2**30, ) # Default 500 GB safety margin - self.gid = kwds.pop("gid", 5050) + self.gid = kwds.pop("gid", 1001) self.scheduler = kwds.pop("scheduler", "threads") - # Default to 25 concurrent copy tasks - self.ntasks = int(kwds.pop("ntasks", 25)) + # Default to 20 concurrent copy tasks + self.num_cores = kwds.pop("num_cores", 20) + if self.num_cores >= psutil.cpu_count(): + self.num_cores = psutil.cpu_count() - 1 def copy( self, @@ -162,7 +165,7 @@ def copy( d.compute( *copy_tasks, scheduler=self.scheduler, - num_workers=self.ntasks, + num_workers=self.num_cores, **compute_kwds, ) print("Copy finished!") diff --git a/tests/calibrator/test_energy.py b/tests/calibrator/test_energy.py index 80c7cbef..378ce095 100644 --- a/tests/calibrator/test_energy.py +++ b/tests/calibrator/test_energy.py @@ -181,7 +181,7 @@ def test_calibrate_append(energy_scale: str, calibration_method: str) -> None: calibration_method (str): method used for calibration """ config = parse_config( - config={"dataframe": {"tof_binning": 2}}, + config={"dataframe": {"tof_binning": 4}}, folder_config={}, user_config={}, system_config={}, @@ -281,7 +281,7 @@ def test_append_tof_ns_axis() -> None: "dataframe": { "tof_column": "t", "tof_ns_column": "t_ns", - "tof_binning": 1, + "tof_binning": 2, "tof_binwidth": 1e-9, }, } @@ -291,7 +291,7 @@ def test_append_tof_ns_axis() -> None: # from kwds df, _, _ = loader.read_dataframe(folders=df_folder, collect_metadata=False) ec = EnergyCalibrator(config=config, loader=loader) - df, _ = ec.append_tof_ns_axis(df, binwidth=2e-9, binning=1) + df, _ = ec.append_tof_ns_axis(df, binwidth=2e-9, binning=2) assert config["dataframe"]["tof_ns_column"] in df.columns np.testing.assert_allclose(df[ec.tof_column], df[ec.tof_ns_column] / 4) @@ -303,7 +303,7 @@ def test_append_tof_ns_axis() -> None: np.testing.assert_allclose(df[ec.tof_column], df[ec.tof_ns_column] / 2) -amplitude = 2.5 # pylint: disable=invalid-name +amplitude = 2.5 center = (730, 730) sample = np.array( [ diff --git a/tutorial/5_sxp_workflow.ipynb b/tutorial/5_sxp_workflow.ipynb index f7a89220..bb7c6a7b 100644 --- a/tutorial/5_sxp_workflow.ipynb +++ b/tutorial/5_sxp_workflow.ipynb @@ -32,8 +32,8 @@ }, "outputs": [], "source": [ - "local_path = Path(sed.__file__).parent.parent / \"tutorial/\"\n", - "config_file = local_path / \"sxp_config.yaml\"\n", + "# pick the default configuration file for the SXP experiment\n", + "config_file = Path('../sed/config/sxp_example_config.yaml')\n", "assert config_file.exists()" ] }, diff --git a/tutorial/6_binning_with_time-stamped_data.ipynb b/tutorial/6_binning_with_time-stamped_data.ipynb index 230386eb..4a103808 100644 --- a/tutorial/6_binning_with_time-stamped_data.ipynb +++ b/tutorial/6_binning_with_time-stamped_data.ipynb @@ -68,7 +68,7 @@ "outputs": [], "source": [ "# create sed processor using the config file with time-stamps:\n", - "sp = sed.SedProcessor(folder=scandir, user_config=\"../sed/config/mpes_example_config.yaml\", time_stamps=True)" + "sp = sed.SedProcessor(folder=scandir, user_config=\"../sed/config/mpes_example_config.yaml\", time_stamps=True, verbose=True)" ] }, {