diff --git a/.cspell/custom-dictionary.txt b/.cspell/custom-dictionary.txt index 5123e47d..c3a3530b 100644 --- a/.cspell/custom-dictionary.txt +++ b/.cspell/custom-dictionary.txt @@ -43,6 +43,8 @@ cdeform cdeformfield cdisp centroidnn +cfel +CFEL chessy clim cmap @@ -63,6 +65,7 @@ cryo cstart cstep csvfile +cumsum custom-dictionary cval cvdist @@ -169,6 +172,7 @@ joblib jpars jupyterlab kernelspec +kmic kmodem KTOF kwds diff --git a/src/sed/config/flash_example_config.yaml b/src/sed/config/flash_example_config.yaml index 232a56bb..695b0520 100644 --- a/src/sed/config/flash_example_config.yaml +++ b/src/sed/config/flash_example_config.yaml @@ -10,8 +10,6 @@ core: beamtime_id: 11019101 # the year of the beamtime year: 2023 - # the instrument used - instrument: hextof # hextof, wespe, etc # The paths to the raw and parquet data directories. If these are not # provided, the loader will try to find the data based on year beamtimeID etc # paths: @@ -32,6 +30,7 @@ core: # (Not to be changed by user) beamtime_dir: pg2: "/asap3/flash/gpfs/pg2/" + cfel: "/asap3/fs-flash-o/gpfs/hextof/" binning: # Histogram computation mode to use. @@ -60,6 +59,11 @@ dataframe: # Columns used for jitter correction jitter_cols: [dldPosX, dldPosY, dldTimeSteps] + # The index and formats of the data + index: [trainId, pulseId, electronId] + formats: [per_train, per_pulse, per_electron] + fill_formats: [per_train, per_pulse] # Channels with this format will be forward filled + # Column settings columns: x: dldPosX @@ -211,8 +215,7 @@ dataframe: # metadata collection from scicat # metadata: -# scicat_url: -# scicat_token: +# archiver_url: # The nexus collection routine shall be finalized soon for both instruments nexus: diff --git a/src/sed/config/lab_example_config.yaml b/src/sed/config/lab_example_config.yaml new file mode 100644 index 00000000..1b88e4af --- /dev/null +++ b/src/sed/config/lab_example_config.yaml @@ -0,0 +1,160 @@ +# This file contains the default configuration for the flash loader. + +core: + # defines the loader + loader: cfel + # Since this will run on maxwell most probably, we have a lot of cores at our disposal + num_cores: 10 + # the ID number of the beamtime + beamtime_id: 11021732 + # the year of the beamtime + year: 2025 + + # The paths to the raw and parquet data directories. If these are not + # provided, the loader will try to find the data based on year beamtimeID etc + paths: + # location of the raw data. + raw: "/asap3/fs-flash-o/gpfs/hextof/2025/data/11021732/raw/" + # location of the intermediate parquet files. + processed: "." + + # The beamtime directories for different DAQ systems. + # (Not to be changed by user) + beamtime_dir: + pg2: "/asap3/flash/gpfs/pg2/" + cfel: "/asap3/fs-flash-o/gpfs/hextof/" + + +dataframe: + daq: fl1user3 # DAQ system name to resolve filenames/paths + ubid_offset: 5 # Offset correction to the pulseId + forward_fill_iterations: 0 # Number of iterations to fill the pulseId forward + split_sector_id_from_dld_time: True # Remove reserved bits for dldSectorID from dldTimeSteps column + sector_id_reserved_bits: 3 # Bits reserved for dldSectorID in the dldTimeSteps column + sector_delays: [0., 0., 0., 0., 0., 0., 0., 0.] # Sector delays + + first_event_time_stamp_key: /ScanParam/StartTime + ms_markers_key: /SlowData/exposure_time + + # Time and binning settings + tof_binwidth: 2.0576131995767355E-11 # Base time-of-flight bin width in seconds + tof_binning: 8 # Binning parameter for time-of-flight data + + # Columns used for jitter correction + index: [countId] + jitter_cols: [dldPosX, dldPosY, dldTimeSteps] + formats: [per_file, per_train, per_electron] + fill_formats: [per_train] # Channels with this format will be forward filled + + # Column settings + columns: + x: dldPosX + corrected_x: X + kx: kx + y: dldPosY + corrected_y: Y + ky: ky + tof: dldTimeSteps + tof_ns: dldTime + corrected_tof: tm + timestamp: timeStamp + auxiliary: dldAux + sector_id: dldSectorID + delay: delayStage + corrected_delay: pumpProbeTime + + units: + # These are the units of the columns + dldPosX: 'step' + dldPosY: 'step' + dldTimeSteps: 'step' + tof_voltage: 'V' + extractorVoltage: 'V' + extractorCurrent: 'A' + cryoTemperature: 'K' + sampleTemperature: 'K' + dldTime: 'ns' + delay: 'ps' + timeStamp: 's' + energy: 'eV' + E: 'eV' + kx: '1/A' + ky: '1/A' + + # The channels to load. + # channels have the following structure: + # : + # format: per_pulse/per_electron/per_train + # index_key: the hdf5 index key + # dataset_key: the hdf5 dataset key + # slice: int to slice a multidimensional data along axis=1. If not defined, there is no slicing + # dtype: the datatype of the data + # subChannels: further aliases for if the data is multidimensional and needs to be split in different cols + # used currently for the auxiliary channel + # : + # slice: int to slice a multidimensional data along axis=1. Must be defined + # dtype: the datatype of the data + + channels: + # event key + countId: + format: per_file + dataset_key: /DLD/NumOfEvents + # detector x position + dldPosX: + format: per_electron + dataset_key: /DLD/DLD/xPos + # dtype: uint32 + + # detector y position + dldPosY: + format: per_electron + dataset_key: /DLD/DLD/yPos + # dtype: uint32 + + # Detector time-of-flight channel + # if split_sector_id_from_dld_time is set to True, This this will generate + # also the dldSectorID channel + dldTimeSteps: + format: per_electron + dataset_key: /DLD/DLD/times + # dtype: uint32 + + # The auxiliary channel has a special structure where the group further contains + # a multidimensional structure so further aliases are defined below + dldAux: + format: per_train + dataset_key: "/SlowData/hextof/dld/info/Aux" + sub_channels: + sampleBias: + slice: 0 + dtype: float32 + tofVoltage: + slice: 1 + dtype: float64 + extractorVoltage: + slice: 2 + extractorCurrent: + slice: 3 + cryoTemperature: + slice: 4 + sampleTemperature: + slice: 5 + dldTimeBinSize: + slice: 15 + + vuRead: + format: per_train + dataset_key: /SlowData/hextof/logic/kmic1/Sample_VURead + + + +# metadata collection from scicat +# metadata: +# archiver_url: + +# The nexus collection routine shall be finalized soon for both instruments +# nexus: +# reader: "mpes" +# definition: "NXmpes" +# input_files: ["NXmpes_config-HEXTOF.json"] diff --git a/src/sed/core/config_model.py b/src/sed/core/config_model.py index 6379b639..b841999f 100644 --- a/src/sed/core/config_model.py +++ b/src/sed/core/config_model.py @@ -26,6 +26,7 @@ class PathsModel(BaseModel): raw: DirectoryPath processed: Optional[Union[DirectoryPath, NewPath]] = None + meta: Optional[Union[DirectoryPath, NewPath]] = None class CopyToolModel(BaseModel): @@ -57,7 +58,6 @@ class CoreModel(BaseModel): num_cores: Optional[PositiveInt] = None year: Optional[int] = None beamtime_id: Optional[Union[int, str]] = None - instrument: Optional[str] = None beamline: Optional[str] = None copy_tool: Optional[CopyToolModel] = None stream_name_prefixes: Optional[dict] = None @@ -140,6 +140,9 @@ class DataframeModel(BaseModel): sector_id_reserved_bits: Optional[int] = None sector_delays: Optional[Sequence[float]] = None daq: Optional[str] = None + index: Optional[Sequence[str]] = None + formats: Optional[Union[Sequence[str], str]] = None + fill_formats: Optional[Union[Sequence[str], str]] = None # SXP specific settings num_trains: Optional[PositiveInt] = None num_pulses: Optional[PositiveInt] = None diff --git a/src/sed/loader/cfel/__init__.py b/src/sed/loader/cfel/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/sed/loader/cfel/buffer_handler.py b/src/sed/loader/cfel/buffer_handler.py new file mode 100644 index 00000000..a84ba2ad --- /dev/null +++ b/src/sed/loader/cfel/buffer_handler.py @@ -0,0 +1,119 @@ +from __future__ import annotations + +import time +from pathlib import Path + +import dask.dataframe as dd + +from sed.core.logging import setup_logging +from sed.loader.cfel.dataframe import DataFrameCreator +from sed.loader.flash.buffer_handler import BufferFilePaths +from sed.loader.flash.buffer_handler import BufferHandler as BaseBufferHandler +from sed.loader.flash.utils import InvalidFileError +from sed.loader.flash.utils import get_channels +from sed.loader.flash.utils import get_dtypes + +logger = setup_logging("cfel_buffer_handler") + + +class BufferHandler(BaseBufferHandler): + """ + A class for handling the creation and manipulation of buffer files using DataFrameCreator. + """ + + def __init__( + self, + config: dict, + ) -> None: + """ + Initializes the BufferHandler. + + Args: + config (dict): The configuration dictionary. + """ + super().__init__(config) + + def _validate_h5_files(self, config, h5_paths: list[Path]) -> list[Path]: + valid_h5_paths = [] + for h5_path in h5_paths: + try: + dfc = DataFrameCreator(config_dataframe=config, h5_path=h5_path) + dfc.validate_channel_keys() + valid_h5_paths.append(h5_path) + except InvalidFileError as e: + logger.info(f"Skipping invalid file: {h5_path.stem}\n{e}") + + return valid_h5_paths + + def _save_buffer_file(self, paths: dict[str, Path]) -> None: + """Creates the electron and timed buffer files from the raw H5 file.""" + logger.debug(f"Processing file: {paths['raw'].stem}") + start_time = time.time() + + # Create DataFrameCreator and get get dataframe + dfc = DataFrameCreator(config_dataframe=self._config, h5_path=paths["raw"]) + df = dfc.df + + # Save electron resolved dataframe + electron_channels = get_channels(self._config, "per_electron") + dtypes = get_dtypes(self._config, df.columns.values) + electron_df = df.dropna(subset=electron_channels).astype(dtypes).reset_index() + logger.debug(f"Saving electron buffer with shape: {electron_df.shape}") + electron_df.to_parquet(paths["electron"]) + + # Create and save timed dataframe + df_timed = dfc.df_timed + dtypes = get_dtypes(self._config, df_timed.columns.values) + timed_df = df_timed.astype(dtypes) + logger.debug(f"Saving timed buffer with shape: {timed_df.shape}") + timed_df.to_parquet(paths["timed"]) + + logger.debug(f"Processed {paths['raw'].stem} in {time.time() - start_time:.2f}s") + + def process_and_load_dataframe( + self, + h5_paths: list[Path], + folder: Path, + force_recreate: bool = False, + suffix: str = "", + debug: bool = False, + remove_invalid_files: bool = False, + filter_timed_by_electron: bool = True, + ) -> tuple[dd.DataFrame, dd.DataFrame]: + """ + Runs the buffer file creation process. + Does a schema check on the buffer files and creates them if they are missing. + Performs forward filling and splits the sector ID from the DLD time lazily. + + Args: + h5_paths (List[Path]): List of paths to H5 files. + folder (Path): Path to the folder for processed files. + force_recreate (bool): Flag to force recreation of buffer files. + suffix (str): Suffix for buffer file names. + debug (bool): Flag to enable debug mode.): + remove_invalid_files (bool): Flag to remove invalid files. + filter_timed_by_electron (bool): Flag to filter timed data by valid electron events. + + Returns: + Tuple[dd.DataFrame, dd.DataFrame]: The electron and timed dataframes. + """ + self.filter_timed_by_electron = filter_timed_by_electron + if remove_invalid_files: + h5_paths = self._validate_h5_files(self._config, h5_paths) + + self.fp = BufferFilePaths(h5_paths, folder, suffix) + + if not force_recreate: + schema_set = set( + get_channels(self._config, formats="all", index=True, extend_aux=True) + + [self._config["columns"].get("timestamp")], + ) + self._schema_check(self.fp["timed"], schema_set) + + self._schema_check(self.fp["electron"], schema_set) + + self._save_buffer_files(force_recreate, debug) + + self._get_dataframes() + + return self.df["electron"], self.df["timed"] diff --git a/src/sed/loader/cfel/dataframe.py b/src/sed/loader/cfel/dataframe.py new file mode 100644 index 00000000..c79321be --- /dev/null +++ b/src/sed/loader/cfel/dataframe.py @@ -0,0 +1,216 @@ +""" +This module creates pandas DataFrames from HDF5 files for different levels of data granularity +[per electron, per pulse, and per train]. It efficiently handles concatenation of data from +various channels within the HDF5 file, making use of the structured nature data to optimize +join operations. This approach significantly enhances performance compared to earlier. +""" +from __future__ import annotations + +from pathlib import Path + +import h5py +import numpy as np +import pandas as pd + +from sed.core.logging import setup_logging +from sed.loader.flash.utils import get_channels +from sed.loader.flash.utils import InvalidFileError + +logger = setup_logging("cfel_dataframe_creator") + + +class DataFrameCreator: + """ + A class for creating pandas DataFrames from an HDF5 file for HEXTOF lab data at CFEL. + + Attributes: + h5_file (h5py.File): The HDF5 file object. + multi_index (pd.MultiIndex): The multi-index structure for the DataFrame. + _config (dict): The configuration dictionary for the DataFrame. + """ + + def __init__(self, config_dataframe: dict, h5_path: Path) -> None: + """ + Initializes the DataFrameCreator class. + + Args: + config_dataframe (dict): The configuration dictionary with only the dataframe key. + h5_path (Path): Path to the h5 file. + """ + self.h5_file = h5py.File(h5_path, "r") + self._config = config_dataframe + + index_alias = self._config.get("index", ["countId"])[0] + # all values except the last as slow data starts from start of file + self.index = np.cumsum([0, *self.get_dataset_array(index_alias)]) + + def get_dataset_key(self, channel: str) -> str: + """ + Checks if 'dataset_key' exists and returns that. + + Args: + channel (str): The name of the channel. + + Returns: + str: The 'dataset_key'. + + Raises: + ValueError: If 'dataset_key' is not provided. + """ + channel_config = self._config["channels"][channel] + if "dataset_key" in channel_config: + return channel_config["dataset_key"] + error = f"For channel: {channel}, provide 'dataset_key'." + raise ValueError(error) + + def get_dataset_array( + self, + channel: str, + ) -> h5py.Dataset: + """ + Returns a numpy array for a given channel name. + + Args: + channel (str): The name of the channel. + slice_ (bool): Applies slicing on the dataset. Default is True. + + Returns: + h5py.Dataset: The channel's data as a h5py.Dataset object. + """ + # Get the data from the necessary h5 file and channel + dataset_key = self.get_dataset_key(channel) + dataset = self.h5_file[dataset_key] + + return dataset + + @property + def df_electron(self) -> pd.DataFrame: + """ + Returns a pandas DataFrame for channel names of type [per electron]. + + Returns: + pd.DataFrame: The pandas DataFrame for the 'per_electron' channel's data. + """ + # Get the relevant channels and their slice index + channels = get_channels(self._config, "per_electron") + if channels == []: + return pd.DataFrame() + + series = {channel: pd.Series(self.get_dataset_array(channel)) for channel in channels} + dataframe = pd.concat(series, axis=1) + return dataframe.dropna() + + @property + def df_train(self) -> pd.DataFrame: + """ + Returns a pandas DataFrame for given channel names of type [per pulse]. + + Returns: + pd.DataFrame: The pandas DataFrame for the 'per_train' channel's data. + """ + series = [] + # Get the relevant channel names + channels = get_channels(self._config, "per_train") + # auxiliary dataset (which is stored in the same dataset as other DLD channels) + aux_alias = self._config.get("aux_alias", "dldAux") + + # For each channel, a pd.Series is created and appended to the list + for channel in channels: + dataset = self.get_dataset_array(channel) + + if channel == aux_alias: + try: + sub_channels = self._config["channels"][aux_alias]["sub_channels"] + except KeyError: + raise KeyError( + f"Provide 'sub_channels' for auxiliary channel '{aux_alias}'.", + ) + for name, values in sub_channels.items(): + series.append( + pd.Series( + dataset[:, values["slice"]], + self.index[:-1], + name=name, + ), + ) + else: + series.append(pd.Series(dataset, self.index[:-1], name=channel)) + # All the channels are concatenated to a single DataFrame + return pd.concat(series, axis=1) + + @property + def df_timestamp(self) -> pd.DataFrame: + """ + Uses the first_event_time_stamp_key to get initial timestamp and the + ms_markers_key which is a dataset of exposure times same size as the index.""" + + first_timestamp = self.h5_file[self._config.get("first_event_time_stamp_key")][ + 0 + ] # single value + ts_start = pd.to_datetime(first_timestamp.decode()) + # actually in seconds but using milliseconds for consistency with mpes loader + exposure_time = self.h5_file[self._config.get("ms_markers_key")][()] + + # Calculate cumulative exposure times + cumulative_exposure = np.cumsum(exposure_time) + timestamps = [ts_start + pd.Timedelta(seconds=cum_exp) for cum_exp in cumulative_exposure] + # add initial timestamp to the start of the list + timestamps.insert(0, ts_start) + + timestamps = [(ts - pd.Timestamp("1970-01-01")) // pd.Timedelta("1s") for ts in timestamps] + # Create a DataFrame with the timestamps + ts_alias = self._config["columns"].get("timestamp") + df = pd.DataFrame({ts_alias: timestamps}, index=self.index) + return df + + def validate_channel_keys(self) -> None: + """ + Validates if the dataset keys for all channels in the config exist in the h5 file. + + Raises: + InvalidFileError: If the dataset keys are missing in the h5 file. + """ + invalid_channels = [] + for channel in self._config["channels"]: + dataset_key = self.get_dataset_key(channel) + if dataset_key not in self.h5_file: + invalid_channels.append(channel) + + if invalid_channels: + raise InvalidFileError(invalid_channels) + + @property + def df(self) -> pd.DataFrame: + """ + Joins the 'per_electron', 'per_pulse' using concat operation, + returning a single dataframe. + + Returns: + pd.DataFrame: The combined pandas DataFrame. + """ + + self.validate_channel_keys() + df_train = self.df_train + df_timestamp = self.df_timestamp + df = pd.concat((self.df_electron, df_train, df_timestamp), axis=1) + ffill_cols = list(df_train.columns) + list(df_timestamp.columns) + df[ffill_cols] = df[ffill_cols].ffill() + df.index.name = self._config.get("index", ["countId"])[0] + return df + + @property + def df_timed(self) -> pd.DataFrame: + """ + Joins the 'per_electron', 'per_pulse' using concat operation, + returning a single dataframe. + + Returns: + pd.DataFrame: The combined pandas DataFrame. + """ + + self.validate_channel_keys() + df_train = self.df_train + df_timestamp = self.df_timestamp + df = pd.concat((self.df_electron, df_train, df_timestamp), axis=1, join="inner") + df.index.name = self._config.get("index", ["countId"])[0] + return df diff --git a/src/sed/loader/cfel/loader.py b/src/sed/loader/cfel/loader.py new file mode 100644 index 00000000..d27db72a --- /dev/null +++ b/src/sed/loader/cfel/loader.py @@ -0,0 +1,526 @@ +""" +This module implements the flash data loader. +This loader currently supports hextof, wespe and instruments with similar structure. +The raw hdf5 data is combined and saved into buffer files and loaded as a dask dataframe. +The dataframe is an amalgamation of all h5 files for a combination of runs, where the NaNs are +automatically forward-filled across different files. +This can then be saved as a parquet for out-of-sed processing and reread back to access other +sed functionality. +""" +from __future__ import annotations + +import re +import time +from collections.abc import Sequence +from pathlib import Path + +import dask.dataframe as dd +import h5py +import numpy as np +import scipy.interpolate as sint +from natsort import natsorted + +from sed.core.logging import set_verbosity +from sed.core.logging import setup_logging +from sed.loader.base.loader import BaseLoader +from sed.loader.cfel.buffer_handler import BufferHandler +from sed.loader.flash.metadata import MetadataRetriever + +# Configure logging +logger = setup_logging("flash_loader") + + +def get_count_rate( + h5file: h5py.File, + ms_markers_key: str = "msMarkers", +) -> tuple[np.ndarray, np.ndarray]: + """Create count rate in the file from the msMarker column. + + Args: + h5file (h5py.File): The h5file from which to get the count rate. + ms_markers_key (str, optional): The hdf5 path where the millisecond markers + are stored. Defaults to "msMarkers". + + Returns: + tuple[np.ndarray, np.ndarray]: The count rate in Hz and the seconds into the + scan. + """ + ms_markers = np.asarray(h5file[ms_markers_key]) + secs = np.arange(0, len(ms_markers)) / 1000 + msmarker_spline = sint.InterpolatedUnivariateSpline(secs, ms_markers, k=1) + rate_spline = msmarker_spline.derivative() + count_rate = rate_spline(secs) + + return (count_rate, secs) + + +class CFELLoader(BaseLoader): + """ + The class generates multiindexed multidimensional pandas dataframes from the new FLASH + dataformat resolved by both macro and microbunches alongside electrons. + Only the read_dataframe (inherited and implemented) method is accessed by other modules. + + Args: + config (dict, optional): Config dictionary. Defaults to None. + verbose (bool, optional): Option to print out diagnostic information. + Defaults to True. + """ + + __name__ = "cfel" + + supported_file_types = ["h5"] + + def __init__(self, config: dict, verbose: bool = True) -> None: + """ + Initializes the FlashLoader. + + Args: + config (dict): Configuration dictionary. + verbose (bool, optional): Option to print out diagnostic information. + """ + super().__init__(config=config, verbose=verbose) + + set_verbosity(logger, self._verbose) + + self.instrument: str = self._config["core"].get("instrument", "hextof") # default is hextof + self.beamtime_dir: str = None + self.raw_dir: str = None + self.processed_dir: str = None + self.meta_dir: str = None + + @property + def verbose(self) -> bool: + """Accessor to the verbosity flag. + + Returns: + bool: Verbosity flag. + """ + return self._verbose + + @verbose.setter + def verbose(self, verbose: bool): + """Setter for the verbosity. + + Args: + verbose (bool): Option to turn on verbose output. Sets loglevel to INFO. + """ + self._verbose = verbose + set_verbosity(logger, self._verbose) + + def _initialize_dirs(self) -> None: + """ + Initializes the directories on Maxwell based on configuration. If paths is provided in + the configuration, the raw data directory and parquet data directory are taken from there. + Otherwise, the beamtime_id and year are used to locate the data directories. + The first path that has either online- or express- prefix, or the daq name is taken as the + raw data directory. + + Raises: + ValueError: If required values are missing from the configuration. + FileNotFoundError: If the raw data directories are not found. + """ + # Parses to locate the raw beamtime directory from config file + # Only raw_dir is necessary, processed_dir can be based on raw_dir, if not provided + if "paths" in self._config["core"]: + raw_dir = Path(self._config["core"]["paths"].get("raw", "")) + print(raw_dir) + processed_dir = Path( + self._config["core"]["paths"].get("processed", raw_dir.joinpath("processed")), + ) + meta_dir = Path( + self._config["core"]["paths"].get("meta", raw_dir.joinpath("meta")), + ) + beamtime_dir = Path(raw_dir).parent + + else: + try: + beamtime_id = self._config["core"]["beamtime_id"] + year = self._config["core"]["year"] + + except KeyError as exc: + raise ValueError( + "The beamtime_id and year are required.", + ) from exc + + beamtime_dir = Path( + self._config["core"]["beamtime_dir"][self._config["core"]["beamline"]], + ) + beamtime_dir = beamtime_dir.joinpath(f"{year}/data/{beamtime_id}/") + + # Use pathlib walk to reach the raw data directory + raw_paths: list[Path] = [] + + for path in beamtime_dir.joinpath("raw").glob("**/*"): + if path.is_dir(): + dir_name = path.name + if dir_name.startswith(("online-", "express-")): + raw_paths.append(path.joinpath(self._config["dataframe"]["daq"])) + elif dir_name == self._config["dataframe"]["daq"].upper(): + raw_paths.append(path) + + if not raw_paths: + raise FileNotFoundError("Raw data directories not found.") + + raw_dir = raw_paths[0].resolve() + + processed_dir = beamtime_dir.joinpath("processed") + meta_dir = beamtime_dir.joinpath("meta/fabtrack/") # cspell:ignore fabtrack + + processed_dir.mkdir(parents=True, exist_ok=True) + + self.beamtime_dir = str(beamtime_dir) + self.raw_dir = str(raw_dir) + self.processed_dir = str(processed_dir) + self.meta_dir = str(meta_dir) + + @property + def available_runs(self) -> list[int]: + # Get all files in raw_dir with "run" in their names + files = list(Path(self.raw_dir).glob("*run*")) + + # Extract run IDs from filenames + run_ids = set() + for file in files: + match = re.search(r"run(\d+)", file.name) + if match: + run_ids.add(int(match.group(1))) + + # Return run IDs in sorted order + return sorted(list(run_ids)) + + def get_files_from_run_id( # type: ignore[override] + self, + run_id: str | int, + folders: str | Sequence[str] = None, + extension: str = "h5", + ) -> list[str]: + """ + Returns a list of filenames for a given run located in the specified directory + for the specified data acquisition (daq). + + Args: + run_id (str | int): The run identifier to locate. + folders (str | Sequence[str], optional): The directory(ies) where the raw + data is located. Defaults to config["core"]["base_folder"]. + extension (str, optional): The file extension. Defaults to "h5". + + Returns: + list[str]: A list of path strings representing the collected file names. + + Raises: + FileNotFoundError: If no files are found for the given run in the directory. + """ + # Define the stream name prefixes based on the data acquisition identifier + stream_name_prefixes = self._config["core"].get("stream_name_prefixes") + + if folders is None: + folders = self._config["core"]["base_folder"] + + if isinstance(folders, str): + folders = [folders] + + daq = self._config["dataframe"]["daq"] + + # Generate the file patterns to search for in the directory + if stream_name_prefixes: + file_pattern = f"{stream_name_prefixes[daq]}_run{run_id}_*." + extension + else: + file_pattern = f"*{run_id}*." + extension + + files: list[Path] = [] + # Use pathlib to search for matching files in each directory + for folder in folders: + files.extend( + natsorted( + Path(folder).glob(file_pattern), + key=lambda filename: str(filename).rsplit("_", maxsplit=1)[-1], + ), + ) + + # Check if any files are found + if not files: + raise FileNotFoundError( + f"No files found for run {run_id} in directory {str(folders)}", + ) + + # Return the list of found files + return [str(file.resolve()) for file in files] + + def parse_scicat_metadata(self, token: str = None) -> dict: + """Uses the MetadataRetriever class to fetch metadata from scicat for each run. + + Returns: + dict: Metadata dictionary + token (str, optional):: The scicat token to use for fetching metadata + """ + metadata_retriever = MetadataRetriever(self._config["metadata"], token) + metadata = metadata_retriever.get_metadata( + beamtime_id=self._config["core"]["beamtime_id"], + runs=self.runs, + metadata=self.metadata, + ) + + return metadata + + def parse_local_metadata(self) -> dict: + """Uses the MetadataRetriever class to fetch metadata from local folder for each run. + + Returns: + dict: Metadata dictionary + """ + metadata_retriever = MetadataRetriever(self._config["metadata"]) + metadata = metadata_retriever.get_local_metadata( + beamtime_id=self._config["core"]["beamtime_id"], + beamtime_dir=self.beamtime_dir, + meta_dir=self.meta_dir, + runs=self.runs, + metadata=self.metadata, + ) + + return metadata + + def get_count_rate( + self, + fids: Sequence[int] = None, + **kwds, + ) -> tuple[np.ndarray, np.ndarray]: + """Create count rate from the msMarker column for the files specified in + ``fids``. + + Args: + fids (Sequence[int], optional): fids (Sequence[int]): the file ids to + include. Defaults to list of all file ids. + kwds: Keyword arguments: + + - **ms_markers_key**: HDF5 path of the ms-markers + + Returns: + tuple[np.ndarray, np.ndarray]: Arrays containing countrate and seconds + into the scan. + """ + if fids is None: + fids = range(0, len(self.files)) + + ms_markers_key = kwds.pop( + "ms_markers_key", + self._config.get("dataframe", {}).get( + "ms_markers_key", + "msMarkers", + ), + ) + + if len(kwds) > 0: + raise TypeError(f"get_count_rate() got unexpected keyword arguments {kwds.keys()}.") + + secs_list = [] + count_rate_list = [] + accumulated_time = 0 + for fid in fids: + try: + count_rate_, secs_ = get_count_rate( + h5py.File(self.files[fid]), + ms_markers_key=ms_markers_key, + ) + secs_list.append((accumulated_time + secs_).T) + count_rate_list.append(count_rate_.T) + accumulated_time += secs_[-1] + except OSError as exc: + if "Unable to synchronously open file" in str(exc): + logger.warning( + f"Unable to open file {fid}: {str(exc)}. " + "Most likely the file is incomplete.", + ) + pass + + count_rate = np.concatenate(count_rate_list) + secs = np.concatenate(secs_list) + + return count_rate, secs + + def get_elapsed_time(self, fids: Sequence[int] = None, **kwds) -> float | list[float]: # type: ignore[override] + """ + Calculates the elapsed time. + + Args: + fids (Sequence[int]): A sequence of file IDs. Defaults to all files. + + Keyword Args: + runs: A sequence of run IDs. Takes precedence over fids. + aggregate: Whether to return the sum of the elapsed times across + the specified files or the elapsed time for each file. Defaults to True. + + Returns: + float | list[float]: The elapsed time(s) in seconds. + + Raises: + KeyError: If a file ID in fids or a run ID in 'runs' does not exist in the metadata. + """ + try: + file_statistics = self.metadata["file_statistics"]["timed"] + except Exception as exc: + raise KeyError( + "File statistics missing. Use 'read_dataframe' first.", + ) from exc + time_stamp_alias = self._config["dataframe"].get("time_stamp_alias", "timeStamp") + + def get_elapsed_time_from_fid(fid): + try: + fid = str(fid) # Ensure the key is a string + time_stamps = file_statistics[fid]["columns"][time_stamp_alias] + print(f"Time stamp max: {time_stamps['max']}") + print(f"Time stamp min: {time_stamps['min']}") + elapsed_time = time_stamps["max"] - time_stamps["min"] + except KeyError as exc: + raise KeyError( + f"Timestamp metadata missing in file {fid}. " + "Add timestamp column and alias to config before loading.", + ) from exc + + return elapsed_time + + def get_elapsed_time_from_run(run_id): + if self.raw_dir is None: + self._initialize_dirs() + files = self.get_files_from_run_id(run_id=run_id, folders=self.raw_dir) + fids = [self.files.index(file) for file in files] + return sum(get_elapsed_time_from_fid(fid) for fid in fids) + + elapsed_times = [] + runs = kwds.pop("runs", None) + aggregate = kwds.pop("aggregate", True) + + if len(kwds) > 0: + raise TypeError(f"get_elapsed_time() got unexpected keyword arguments {kwds.keys()}.") + + if runs is not None: + elapsed_times = [get_elapsed_time_from_run(run) for run in runs] + else: + if fids is None: + fids = range(len(self.files)) + elapsed_times = [get_elapsed_time_from_fid(fid) for fid in fids] + + if aggregate: + elapsed_times = sum(elapsed_times) + + return elapsed_times + + def read_dataframe( + self, + files: str | Sequence[str] = None, + folders: str | Sequence[str] = None, + runs: str | int | Sequence[str | int] = None, + ftype: str = "h5", + metadata: dict = {}, + collect_metadata: bool = False, + **kwds, + ) -> tuple[dd.DataFrame, dd.DataFrame, dict]: + """ + Read express data from the DAQ, generating a parquet in between. + + Args: + files (str | Sequence[str], optional): File path(s) to process. Defaults to None. + folders (str | Sequence[str], optional): Path to folder(s) where files are stored + Path has priority such that if it's specified, the specified files will be ignored. + Defaults to None. + runs (str | int | Sequence[str | int], optional): Run identifier(s). + Corresponding files will be located in the location provided by ``folders``. + Takes precedence over ``files`` and ``folders``. Defaults to None. + ftype (str, optional): The file extension type. Defaults to "h5". + metadata (dict, optional): Additional metadata. Defaults to None. + collect_metadata (bool, optional): Whether to collect metadata. Defaults to False. + + Keyword Args: + detector (str, optional): The detector to use. Defaults to "". + force_recreate (bool, optional): Whether to force recreation of the buffer files. + Defaults to False. + processed_dir (str, optional): The directory to save the processed files. + Defaults to None. + debug (bool, optional): Whether to run buffer creation in serial. Defaults to False. + remove_invalid_files (bool, optional): Whether to exclude invalid files. + Defaults to False. + token (str, optional): The scicat token to use for fetching metadata. If provided, + will be saved to .env file for future use. If not provided, will check environment + variables when collect_metadata is True. + filter_timed_by_electron (bool, optional): When True, the timed dataframe will only + contain data points where valid electron events were detected. When False, all + timed data points are included regardless of electron detection. Defaults to True. + + Returns: + tuple[dd.DataFrame, dd.DataFrame, dict]: A tuple containing the concatenated DataFrame + and metadata. + + Raises: + ValueError: If neither 'runs' nor 'files'/'raw_dir' is provided. + FileNotFoundError: If the conversion fails for some files or no data is available. + ValueError: If collect_metadata is True and no token is available. + """ + detector = kwds.pop("detector", "") + force_recreate = kwds.pop("force_recreate", False) + processed_dir = kwds.pop("processed_dir", None) + debug = kwds.pop("debug", False) + remove_invalid_files = kwds.pop("remove_invalid_files", False) + token = kwds.pop("token", None) + filter_timed_by_electron = kwds.pop("filter_timed_by_electron", True) + + if len(kwds) > 0: + raise ValueError(f"Unexpected keyword arguments: {kwds.keys()}") + t0 = time.time() + + self._initialize_dirs() + # Prepare a list of names for the runs to read and parquets to write + if runs is not None: + files = [] + runs_ = [str(runs)] if isinstance(runs, (str, int)) else list(map(str, runs)) + for run in runs_: + run_files = self.get_files_from_run_id( + run_id=run, + folders=self.raw_dir, + ) + files.extend(run_files) + self.runs = runs_ + super().read_dataframe(files=files, ftype=ftype) + else: + # This call takes care of files and folders. As we have converted runs into files + # already, they are just stored in the class by this call. + super().read_dataframe( + files=files, + folders=folders, + ftype=ftype, + metadata=metadata, + ) + + bh = BufferHandler( + config=self._config, + ) + + # if processed_dir is None, use self.processed_dir + processed_dir = processed_dir or self.processed_dir + processed_dir = Path(processed_dir) + + # Obtain the parquet filenames, metadata, and schema from the method + # which handles buffer file creation/reading + h5_paths = [Path(file) for file in self.files] + df, df_timed = bh.process_and_load_dataframe( + h5_paths=h5_paths, + folder=processed_dir, + force_recreate=force_recreate, + suffix=detector, + debug=debug, + remove_invalid_files=remove_invalid_files, + filter_timed_by_electron=filter_timed_by_electron, + ) + + if len(self.parse_scicat_metadata(token)) == 0: + print("No SciCat metadata available, checking local folder") + self.metadata.update(self.parse_local_metadata()) + else: + print("Metadata taken from SciCat") + self.metadata.update(self.parse_scicat_metadata(token) if collect_metadata else {}) + self.metadata.update(bh.metadata) + + print(f"loading complete in {time.time() - t0: .2f} s") + + return df, df_timed, self.metadata + + +LOADER = CFELLoader diff --git a/src/sed/loader/flash/buffer_handler.py b/src/sed/loader/flash/buffer_handler.py index d56de29f..b68de4d4 100644 --- a/src/sed/loader/flash/buffer_handler.py +++ b/src/sed/loader/flash/buffer_handler.py @@ -1,13 +1,14 @@ from __future__ import annotations import os -from pathlib import Path import time +from pathlib import Path import dask.dataframe as dd import pyarrow.parquet as pq from joblib import delayed from joblib import Parallel +from pandas import MultiIndex from sed.core.dfops import forward_fill_lazy from sed.core.logging import setup_logging @@ -40,11 +41,9 @@ class BufferFilePaths: def __init__( self, - config: dict, h5_paths: list[Path], folder: Path, suffix: str, - remove_invalid_files: bool, ) -> None: """Initializes the BufferFilePaths. @@ -57,9 +56,6 @@ def __init__( folder = folder / "buffer" folder.mkdir(parents=True, exist_ok=True) - if remove_invalid_files: - h5_paths = self.remove_invalid_files(config, h5_paths) - self._file_paths = self._create_file_paths(h5_paths, folder, suffix) def _create_file_paths( @@ -93,18 +89,6 @@ def file_sets_to_process(self, force_recreate: bool = False) -> list[dict[str, P return self._file_paths return [file_set for file_set in self if any(not file_set[key].exists() for key in DF_TYP)] - def remove_invalid_files(self, config, h5_paths: list[Path]) -> list[Path]: - valid_h5_paths = [] - for h5_path in h5_paths: - try: - dfc = DataFrameCreator(config_dataframe=config, h5_path=h5_path) - dfc.validate_channel_keys() - valid_h5_paths.append(h5_path) - except InvalidFileError as e: - logger.info(f"Skipping invalid file: {h5_path.stem}\n{e}") - - return valid_h5_paths - class BufferHandler: """ @@ -125,14 +109,27 @@ def __init__( self.n_cores: int = config["core"].get("num_cores", os.cpu_count() - 1) self.fp: BufferFilePaths = None self.df: dict[str, dd.DataFrame] = {typ: None for typ in DF_TYP} + fill_formats = self._config.get("fill_formats", ["per_train", "per_pulse"]) self.fill_channels: list[str] = get_channels( self._config, - ["per_pulse", "per_train"], + fill_formats, extend_aux=True, ) self.metadata: dict = {} self.filter_timed_by_electron: bool = None + def _validate_h5_files(self, config, h5_paths: list[Path]) -> list[Path]: + valid_h5_paths = [] + for h5_path in h5_paths: + try: + dfc = DataFrameCreator(config_dataframe=config, h5_path=h5_path) + dfc.validate_channel_keys() + valid_h5_paths.append(h5_path) + except InvalidFileError as e: + logger.info(f"Skipping invalid file: {h5_path.stem}\n{e}") + + return valid_h5_paths + def _schema_check(self, files: list[Path], expected_schema_set: set) -> None: """ Checks the schema of the Parquet files. @@ -182,8 +179,7 @@ def _create_timed_dataframe(self, df: dd.DataFrame) -> dd.DataFrame: # Take all timed data rows without filtering df_timed = df[timed_channels] - # Take only first electron per event - return df_timed.loc[:, :, 0] + return df_timed def _save_buffer_file(self, paths: dict[str, Path]) -> None: """Creates the electron and timed buffer files from the raw H5 file.""" @@ -205,6 +201,12 @@ def _save_buffer_file(self, paths: dict[str, Path]) -> None: # Create and save timed dataframe df_timed = self._create_timed_dataframe(df) + # timed dataframe + if isinstance(df.index, MultiIndex): + # drop the electron channels and only take rows with the first electronId + df_timed = df[self.fill_channels].loc[:, :, 0] + else: + df_timed = df[self.fill_channels] dtypes = get_dtypes(self._config, df_timed.columns.values) timed_df = df_timed.astype(dtypes).reset_index() logger.debug(f"Saving timed buffer with shape: {timed_df.shape}") @@ -251,25 +253,26 @@ def _get_dataframes(self) -> None: filling = {} for typ in DF_TYP: # Read the parquet files into a dask dataframe - df = dd.read_parquet(self.fp[typ], calculate_divisions=True) + df = dd.read_parquet(self.fp[typ]) # , calculate_divisions=True) # Get the metadata from the parquet files file_stats[typ] = get_parquet_metadata(self.fp[typ]) # Forward fill the non-electron channels across files overlap = min(file["num_rows"] for file in file_stats[typ].values()) iterations = self._config.get("forward_fill_iterations", 2) - df = forward_fill_lazy( - df=df, - columns=self.fill_channels, - before=overlap, - iterations=iterations, - ) - # TODO: This dict should be returned by forward_fill_lazy - filling[typ] = { - "columns": self.fill_channels, - "overlap": overlap, - "iterations": iterations, - } + if iterations: + df = forward_fill_lazy( + df=df, + columns=self.fill_channels, + before=overlap, + iterations=iterations, + ) + # TODO: This dict should be returned by forward_fill_lazy + filling[typ] = { + "columns": self.fill_channels, + "overlap": overlap, + "iterations": iterations, + } self.df[typ] = df self.metadata.update({"file_statistics": file_stats, "filling": filling}) @@ -311,8 +314,11 @@ def process_and_load_dataframe( Returns: Tuple[dd.DataFrame, dd.DataFrame]: The electron and timed dataframes. """ - self.fp = BufferFilePaths(self._config, h5_paths, folder, suffix, remove_invalid_files) self.filter_timed_by_electron = filter_timed_by_electron + if remove_invalid_files: + h5_paths = self._validate_h5_files(self._config, h5_paths) + + self.fp = BufferFilePaths(h5_paths, folder, suffix) if not force_recreate: schema_set = set( diff --git a/src/sed/loader/flash/dataframe.py b/src/sed/loader/flash/dataframe.py index f50abe10..61bc6aa6 100644 --- a/src/sed/loader/flash/dataframe.py +++ b/src/sed/loader/flash/dataframe.py @@ -12,9 +12,9 @@ import numpy as np import pandas as pd +from sed.core.logging import setup_logging from sed.loader.flash.utils import get_channels from sed.loader.flash.utils import InvalidFileError -from sed.core.logging import setup_logging logger = setup_logging("flash_dataframe_creator") @@ -39,8 +39,8 @@ def __init__(self, config_dataframe: dict, h5_path: Path) -> None: """ logger.debug(f"Initializing DataFrameCreator for file: {h5_path}") self.h5_file = h5py.File(h5_path, "r") - self.multi_index = get_channels(index=True) self._config = config_dataframe + self.multi_index = get_channels(self._config, index=True) def get_index_dataset_key(self, channel: str) -> tuple[str, str]: """ diff --git a/src/sed/loader/flash/instruments.py b/src/sed/loader/flash/instruments.py deleted file mode 100644 index 8ef0146e..00000000 --- a/src/sed/loader/flash/instruments.py +++ /dev/null @@ -1,9 +0,0 @@ -from __future__ import annotations - -from dask import dataframe as dd - - -def wespe_convert(df: dd.DataFrame, df_timed: dd.DataFrame) -> tuple[dd.DataFrame, dd.DataFrame]: - df - df_timed - raise NotImplementedError("This function is not implemented yet.") diff --git a/src/sed/loader/flash/loader.py b/src/sed/loader/flash/loader.py index c2cf79b9..a01acbb7 100644 --- a/src/sed/loader/flash/loader.py +++ b/src/sed/loader/flash/loader.py @@ -1,6 +1,5 @@ """ This module implements the flash data loader. -This loader currently supports hextof, wespe and instruments with similar structure. The raw hdf5 data is combined and saved into buffer files and loaded as a dask dataframe. The dataframe is an amalgamation of all h5 files for a combination of runs, where the NaNs are automatically forward-filled across different files. @@ -21,7 +20,6 @@ from sed.core.logging import setup_logging from sed.loader.base.loader import BaseLoader from sed.loader.flash.buffer_handler import BufferHandler -from sed.loader.flash.instruments import wespe_convert from sed.loader.flash.metadata import MetadataRetriever # Configure logging @@ -401,9 +399,6 @@ def read_dataframe( filter_timed_by_electron=filter_timed_by_electron, ) - if self.instrument == "wespe": - df, df_timed = wespe_convert(df, df_timed) - self.metadata.update(self.parse_metadata(token) if collect_metadata else {}) self.metadata.update(bh.metadata) diff --git a/src/sed/loader/flash/metadata.py b/src/sed/loader/flash/metadata.py index 578fa9fd..43b20bf8 100644 --- a/src/sed/loader/flash/metadata.py +++ b/src/sed/loader/flash/metadata.py @@ -5,6 +5,8 @@ from __future__ import annotations import requests +import json +import yaml from sed.core.config import read_env_var from sed.core.config import save_env_var @@ -128,19 +130,109 @@ def _get_metadata_per_run(self, pid: str) -> dict: return {} # Return an empty dictionary for this run def _create_old_dataset_url(self, pid: str) -> str: - return "{burl}/{url}/%2F{npid}".format( + return "{burl}{url}/%2F{npid}".format( burl=self.url, - url="Datasets", + url="datasets",#"Datasets", npid=self._reformat_pid(pid), ) def _create_new_dataset_url(self, pid: str) -> str: - return "{burl}/{url}/{npid}".format( + return "{burl}{url}/{npid}".format( burl=self.url, - url="Datasets", + url="datasets",#"Datasets", npid=self._reformat_pid(pid), ) def _reformat_pid(self, pid: str) -> str: """SciCat adds a pid-prefix + "/" but at DESY prefix = "" """ return (pid).replace("/", "%2F") + + def get_local_metadata( + self, + beamtime_id: str, + beamtime_dir: str, + meta_dir: str, + runs: list, + metadata: dict = None, + ) -> dict: + """ + Retrieves metadata for a given beamtime ID and list of runs from local meta folder and yaml file. + + Args: + beamtime_id (str): The ID of the beamtime. + runs (list): A list of run IDs. + metadata (dict, optional): The existing metadata dictionary. + Defaults to None. + + Returns: + Dict: The updated metadata dictionary. + + Raises: + Exception: If the request to retrieve metadata fails. + """ + if metadata is None: + metadata = {} + + beamtime_metadata = self._get_beamtime_metadata(beamtime_dir,beamtime_id) + metadata.update(beamtime_metadata) + for run in runs: + logger.debug(f"Retrieving metadata for PID: {run}") + local_metadata_per_run = self._get_local_metadata_per_run(meta_dir,run) + local_metadata_per_run.update(local_metadata_per_run) # TODO: Not correct for multiple runs + + metadata.update({'scientificMetadata': local_metadata_per_run['_data']}) + + logger.debug(f"Retrieved metadata with {len(metadata)} entries") + return metadata + + def _get_beamtime_metadata( + self, + beamtime_dir: str, + beamtime_id: str, + ) -> dict: + """ + Retrieves general metadata for a given beamtime ID from beamtime-metadata-{beamtime_id}.json file + + Args: + beamtime_id (str): The ID of the beamtime. + meta_dir(str): The existing local metadata folder. + + Returns: + Dict: The retrieved metadata dictionary. + + Raises: + Exception: If the request to retrieve metadata fails. + """ + try: + f = open(f'{beamtime_dir}/beamtime-metadata-{beamtime_id}.json', "r") + beamtime_metadata = json.loads(f.read()) + return beamtime_metadata + + except Exception as exception: + logger.warning(f"Failed to retrieve metadata for beamtime ID {beamtime_id}: {str(exception)}") + return {} # Return an empty dictionary for this beamtime ID + + + def _get_local_metadata_per_run(self, meta_dir: str, run: str) -> dict: + """ + Retrieves metadata for a specific run based on the PID from yaml file in the local beamtime folder. + + Args: + pid (str): The PID of the run. + + Returns: + dict: The retrieved metadata. + + Raises: + Exception: If the request to retrieve metadata fails. + """ + try: + run = str(run) + with open(f"{meta_dir}/{run}_1.yaml", 'r') as stream: + print("Getting metadata from local folder") + run_metadata = yaml.safe_load(stream) + return run_metadata + + except Exception as exception: + logger.warning(f"Failed to retrieve metadata for PID {run}: {str(exception)}") + return {"_data":{}} # Return an empty dictionary for this run diff --git a/src/sed/loader/flash/utils.py b/src/sed/loader/flash/utils.py index 85bca9a4..0f41aaaa 100644 --- a/src/sed/loader/flash/utils.py +++ b/src/sed/loader/flash/utils.py @@ -1,12 +1,6 @@ from __future__ import annotations -# TODO: move to config -MULTI_INDEX = ["trainId", "pulseId", "electronId"] -PULSE_ALIAS = MULTI_INDEX[1] -FORMATS = ["per_electron", "per_pulse", "per_train"] - - def get_channels( config_dataframe: dict = {}, formats: str | list[str] = None, @@ -29,7 +23,9 @@ def get_channels( List[str]: A list of channels with the specified format(s). """ channel_dict = config_dataframe.get("channels", {}) - aux_alias = config_dataframe.get("aux_alias", "dldAux") + index_list = config_dataframe.get("index", ["trainId", "pulseId", "electronId"]) + formats_list = config_dataframe.get("formats", ["per_train", "per_pulse", "per_electron"]) + aux_alias = channel_dict.get("auxiliary", "dldAux") # If 'formats' is a single string, convert it to a list for uniform processing. if isinstance(formats, str): @@ -39,7 +35,7 @@ def get_channels( if formats == ["all"]: channels = get_channels( config_dataframe, - FORMATS, + formats_list, index, extend_aux, ) @@ -47,24 +43,25 @@ def get_channels( channels = [] - # Include channels from multi_index if 'index' is True. + # Include channels from index_list if 'index' is True. if index: - channels.extend(MULTI_INDEX) + channels.extend(index_list) if formats: # If 'formats' is a list, check if all elements are valid. - err_msg = ( - "Invalid format. Please choose from 'per_electron', 'per_pulse', 'per_train', 'all'." - ) for format_ in formats: - if format_ not in FORMATS + ["all"]: - raise ValueError(err_msg) + if format_ not in formats_list + ["all"]: + raise ValueError( + f"Invalid format: {format_}. " f"Valid formats are: {formats_list + ['all']}", + ) # Get the available channels excluding 'pulseId'. available_channels = list(channel_dict.keys()) # pulse alias is an index and should not be included in the list of channels. - if PULSE_ALIAS in available_channels: - available_channels.remove(PULSE_ALIAS) + # Remove index channels if they are present in available_channels. + for channel in index_list: + if channel in available_channels: + available_channels.remove(channel) for format_ in formats: # Gather channels based on the specified format(s). @@ -75,7 +72,7 @@ def get_channels( ) # Include 'dldAuxChannels' if the format is 'per_train' and extend_aux is True. # Otherwise, include 'dldAux'. - if format_ == FORMATS[2] and aux_alias in available_channels: + if format_ == "per_train" and aux_alias in available_channels: if extend_aux: channels.extend( channel_dict[aux_alias]["sub_channels"].keys(), diff --git a/tests/data/loader/cfel/20250411_12h34m03s185_000123.h5 b/tests/data/loader/cfel/20250411_12h34m03s185_000123.h5 new file mode 100644 index 00000000..c7146891 Binary files /dev/null and b/tests/data/loader/cfel/20250411_12h34m03s185_000123.h5 differ diff --git a/tests/data/loader/cfel/config.yaml b/tests/data/loader/cfel/config.yaml new file mode 100644 index 00000000..f80b90d0 --- /dev/null +++ b/tests/data/loader/cfel/config.yaml @@ -0,0 +1,160 @@ +# This file contains the default configuration for the flash loader. + +core: + # defines the loader + loader: cfel + # Since this will run on maxwell most probably, we have a lot of cores at our disposal + num_cores: 10 + # the ID number of the beamtime + beamtime_id: 11021732 + # the year of the beamtime + year: 2025 + + # The paths to the raw and parquet data directories. If these are not + # provided, the loader will try to find the data based on year beamtimeID etc + paths: + # location of the raw data. + raw: "tests/data/loader/cfel/" + # location of the intermediate parquet files. + processed: "tests/data/loader/cfel/parquet" + + # The beamtime directories for different DAQ systems. + # (Not to be changed by user) + beamtime_dir: + pg2: "/asap3/flash/gpfs/pg2/" + cfel: "/asap3/fs-flash-o/gpfs/hextof/" + + +dataframe: + daq: fl1user3 # DAQ system name to resolve filenames/paths + ubid_offset: 5 # Offset correction to the pulseId + forward_fill_iterations: 0 # Number of iterations to fill the pulseId forward + split_sector_id_from_dld_time: True # Remove reserved bits for dldSectorID from dldTimeSteps column + sector_id_reserved_bits: 3 # Bits reserved for dldSectorID in the dldTimeSteps column + sector_delays: [0., 0., 0., 0., 0., 0., 0., 0.] # Sector delays + + first_event_time_stamp_key: /ScanParam/StartTime + ms_markers_key: /SlowData/exposure_time + + # Time and binning settings + tof_binwidth: 2.0576131995767355E-11 # Base time-of-flight bin width in seconds + tof_binning: 8 # Binning parameter for time-of-flight data + + # Columns used for jitter correction + index: [countId] + jitter_cols: [dldPosX, dldPosY, dldTimeSteps] + formats: [per_file, per_train, per_electron] + fill_formats: [per_train] # Channels with this format will be forward filled + + # Column settings + columns: + x: dldPosX + corrected_x: X + kx: kx + y: dldPosY + corrected_y: Y + ky: ky + tof: dldTimeSteps + tof_ns: dldTime + corrected_tof: tm + timestamp: timeStamp + auxiliary: dldAux + sector_id: dldSectorID + delay: delayStage + corrected_delay: pumpProbeTime + + units: + # These are the units of the columns + dldPosX: 'step' + dldPosY: 'step' + dldTimeSteps: 'step' + tof_voltage: 'V' + extractorVoltage: 'V' + extractorCurrent: 'A' + cryoTemperature: 'K' + sampleTemperature: 'K' + dldTime: 'ns' + delay: 'ps' + timeStamp: 's' + energy: 'eV' + E: 'eV' + kx: '1/A' + ky: '1/A' + + # The channels to load. + # channels have the following structure: + # : + # format: per_pulse/per_electron/per_train + # index_key: the hdf5 index key + # dataset_key: the hdf5 dataset key + # slice: int to slice a multidimensional data along axis=1. If not defined, there is no slicing + # dtype: the datatype of the data + # subChannels: further aliases for if the data is multidimensional and needs to be split in different cols + # used currently for the auxiliary channel + # : + # slice: int to slice a multidimensional data along axis=1. Must be defined + # dtype: the datatype of the data + + channels: + # event key + countId: + format: per_file + dataset_key: /DLD/NumOfEvents + # detector x position + dldPosX: + format: per_electron + dataset_key: /DLD/DLD/xPos + # dtype: uint32 + + # detector y position + dldPosY: + format: per_electron + dataset_key: /DLD/DLD/yPos + # dtype: uint32 + + # Detector time-of-flight channel + # if split_sector_id_from_dld_time is set to True, This this will generate + # also the dldSectorID channel + dldTimeSteps: + format: per_electron + dataset_key: /DLD/DLD/times + # dtype: uint32 + + # The auxiliary channel has a special structure where the group further contains + # a multidimensional structure so further aliases are defined below + dldAux: + format: per_train + dataset_key: "/SlowData/hextof/dld/info/Aux" + sub_channels: + sampleBias: + slice: 0 + dtype: float32 + tofVoltage: + slice: 1 + dtype: float64 + extractorVoltage: + slice: 2 + extractorCurrent: + slice: 3 + cryoTemperature: + slice: 4 + sampleTemperature: + slice: 5 + dldTimeBinSize: + slice: 15 + + vuRead: + format: per_train + dataset_key: /SlowData/hextof/logic/kmic1/Sample_VURead + + + +# metadata collection from scicat +# metadata: +# archiver_url: + +# The nexus collection routine shall be finalized soon for both instruments +# nexus: +# reader: "mpes" +# definition: "NXmpes" +# input_files: ["NXmpes_config-HEXTOF.json"] diff --git a/tests/data/loader/flash/config.yaml b/tests/data/loader/flash/config.yaml index fbbcba25..90101c81 100644 --- a/tests/data/loader/flash/config.yaml +++ b/tests/data/loader/flash/config.yaml @@ -31,6 +31,7 @@ core: # (Not to be changed by user) beamtime_dir: pg2: "/asap3/flash/gpfs/pg2/" + cfel: "/asap3/fs-flash-o/gpfs/hextof/" dataframe: @@ -52,6 +53,10 @@ dataframe: sector_delays: [0., 0., 0., 0., 0., 0., 0., 0.] jitter_cols: ["dldPosX", "dldPosY", "dldTimeSteps"] + # The index and formats of the data + index: [trainId, pulseId, electronId] + formats: [per_train, per_pulse, per_electron] + fill_formats: [per_train, per_pulse] # Channels with this format will be forward filled columns: x: dldPosX corrected_x: X diff --git a/tests/loader/cfel/test_get_elapsed b/tests/loader/cfel/test_get_elapsed new file mode 100644 index 00000000..e69de29b diff --git a/tests/loader/flash/test_buffer_handler.py b/tests/loader/flash/test_buffer_handler.py index 3eb0e625..62c696c8 100644 --- a/tests/loader/flash/test_buffer_handler.py +++ b/tests/loader/flash/test_buffer_handler.py @@ -45,7 +45,7 @@ def test_buffer_file_paths(config: dict, h5_paths: list[Path]) -> None: the checks with modified file name parameters. """ folder = create_parquet_dir(config, "get_files_to_read") - fp = BufferFilePaths(config, h5_paths, folder, suffix="", remove_invalid_files=False) + fp = BufferFilePaths(h5_paths, folder, suffix="") # check that all files are to be read assert len(fp.file_sets_to_process()) == len(h5_paths) @@ -70,7 +70,7 @@ def test_buffer_file_paths(config: dict, h5_paths: list[Path]) -> None: bh._save_buffer_file(path) # check again for files to read and expect one less file - fp = BufferFilePaths(config, h5_paths, folder, suffix="", remove_invalid_files=False) + fp = BufferFilePaths(h5_paths, folder, suffix="") # check that only one file is to be read assert len(fp.file_sets_to_process()) == len(h5_paths) - 1 @@ -82,7 +82,7 @@ def test_buffer_file_paths(config: dict, h5_paths: list[Path]) -> None: Path(path["timed"]).unlink() # Test for adding a suffix - fp = BufferFilePaths(config, h5_paths, folder, "suffix", remove_invalid_files=False) + fp = BufferFilePaths(h5_paths, folder, "suffix") # expected buffer paths with prefix and suffix for typ in ["electron", "timed"]: diff --git a/tests/loader/flash/test_utils.py b/tests/loader/flash/test_utils.py index 929a9305..d65d8010 100644 --- a/tests/loader/flash/test_utils.py +++ b/tests/loader/flash/test_utils.py @@ -45,8 +45,8 @@ def test_get_channels_by_format(config_dataframe: dict) -> None: # Request channels for 'all' formats using a list. format_all = get_channels(ch_dict, ["all"]) - # Request index channels only. No need for channel_dict. - format_index = get_channels(index=True) + # Request index channels only. + format_index = get_channels(ch_dict, index=True) # Request 'per_electron' format and include index channels. format_index_electron = get_channels(ch_dict, ["per_electron"], index=True) diff --git a/tests/loader/test_loaders.py b/tests/loader/test_loaders.py index a5b357d0..da13fcad 100644 --- a/tests/loader/test_loaders.py +++ b/tests/loader/test_loaders.py @@ -22,7 +22,13 @@ test_data_dir = os.path.join(test_dir, "data") read_types = ["one_file", "files", "one_folder", "folders", "one_run", "runs"] -runs = {"generic": None, "mpes": ["30", "50"], "flash": ["43878", "43878"], "sxp": ["0016", "0016"]} +runs = { + "generic": None, + "mpes": ["30", "50"], + "flash": ["43878", "43878"], + "sxp": ["0016", "0016"], + "cfel": ["123"], +} def get_loader_name_from_loader_object(loader: BaseLoader) -> str: @@ -94,7 +100,7 @@ def test_has_correct_read_dataframe_func(loader: BaseLoader, read_type: str) -> assert callable(loader.read_dataframe) # Fix for race condition during parallel testing - if loader.__name__ in {"flash", "sxp"}: + if loader.__name__ in {"flash", "sxp", "cfel"}: config = deepcopy(loader._config) # pylint: disable=protected-access config["core"]["paths"]["processed"] = Path( config["core"]["paths"]["processed"], @@ -167,7 +173,7 @@ def test_has_correct_read_dataframe_func(loader: BaseLoader, read_type: str) -> assert loaded_dataframe.npartitions == expected_size assert isinstance(loaded_metadata, dict) - if loader.__name__ in {"flash", "sxp"}: + if loader.__name__ in {"flash", "sxp", "cfel"}: loader = cast(FlashLoader, loader) loader._initialize_dirs() for file in os.listdir(Path(loader.processed_dir, "buffer")): @@ -183,7 +189,7 @@ def test_timed_dataframe(loader: BaseLoader) -> None: """ # Fix for race condition during parallel testing - if loader.__name__ in {"flash", "sxp"}: + if loader.__name__ in {"flash", "sxp", "cfel"}: config = deepcopy(loader._config) # pylint: disable=protected-access config["core"]["paths"]["processed"] = Path( config["core"]["paths"]["processed"], @@ -201,7 +207,7 @@ def test_timed_dataframe(loader: BaseLoader) -> None: collect_metadata=False, ) if loaded_timed_dataframe is None: - if loader.__name__ in {"flash", "sxp"}: + if loader.__name__ in {"flash", "sxp", "cfel"}: loader = cast(FlashLoader, loader) loader._initialize_dirs() for file in os.listdir(Path(loader.processed_dir, "buffer")): @@ -211,7 +217,7 @@ def test_timed_dataframe(loader: BaseLoader) -> None: assert set(loaded_timed_dataframe.columns).issubset(set(loaded_dataframe.columns)) assert loaded_timed_dataframe.npartitions == loaded_dataframe.npartitions - if loader.__name__ in {"flash", "sxp"}: + if loader.__name__ in {"flash", "sxp", "cfel"}: loader = cast(FlashLoader, loader) loader._initialize_dirs() for file in os.listdir(Path(loader.processed_dir, "buffer")): @@ -227,7 +233,7 @@ def test_get_count_rate(loader: BaseLoader) -> None: """ # Fix for race condition during parallel testing - if loader.__name__ in {"flash", "sxp"}: + if loader.__name__ in {"flash", "sxp", "cfel"}: config = deepcopy(loader._config) # pylint: disable=protected-access config["core"]["paths"]["processed"] = Path( config["core"]["paths"]["processed"], @@ -246,7 +252,7 @@ def test_get_count_rate(loader: BaseLoader) -> None: ) loaded_time, loaded_countrate = loader.get_count_rate() if loaded_time is None and loaded_countrate is None: - if loader.__name__ in {"flash", "sxp"}: + if loader.__name__ in {"flash", "sxp", "cfel"}: loader = cast(FlashLoader, loader) loader._initialize_dirs() for file in os.listdir(Path(loader.processed_dir, "buffer")): @@ -261,7 +267,7 @@ def test_get_count_rate(loader: BaseLoader) -> None: with pytest.raises(TypeError): loader.get_count_rate(illegal_kwd=True) - if loader.__name__ in {"flash", "sxp"}: + if loader.__name__ in {"flash", "sxp", "cfel"}: loader = cast(FlashLoader, loader) loader._initialize_dirs() for file in os.listdir(Path(loader.processed_dir, "buffer")): @@ -277,7 +283,7 @@ def test_get_elapsed_time(loader: BaseLoader) -> None: """ # Fix for race condition during parallel testing - if loader.__name__ in {"flash", "sxp"}: + if loader.__name__ in {"flash", "sxp", "cfel"}: config = deepcopy(loader._config) # pylint: disable=protected-access config["core"]["paths"]["processed"] = Path( config["core"]["paths"]["processed"], @@ -311,7 +317,7 @@ def test_get_elapsed_time(loader: BaseLoader) -> None: with pytest.raises(TypeError): loader.get_elapsed_time(illegal_kwd=True) - if loader.__name__ in {"flash", "sxp"}: + if loader.__name__ in {"flash", "sxp", "cfel"}: loader = cast(FlashLoader, loader) loader._initialize_dirs() for file in os.listdir(Path(loader.processed_dir, "buffer")):