Skip to content

API reference

earthcarekit.read

Reading utilities for EarthCARE product data.


FileAgency

Bases: FileInfoEnum

Source code in earthcarekit/utils/read/product/file_info/agency.py
class FileAgency(FileInfoEnum):
    ESA = "E"
    JAXA = "J"

    @classmethod
    def from_input(cls, input: str | xr.Dataset) -> "FileAgency":
        """Infers the EarthCARE product agency (i.e. ESA or JAXA) from a given file or dataset."""
        if isinstance(input, str):
            try:
                return cls[input.upper()]
            except AttributeError:
                pass
            except KeyError:
                pass
            try:
                return cls(input.upper())
            except ValueError:
                pass

        return get_file_agency(input)

from_input classmethod

from_input(input)

Infers the EarthCARE product agency (i.e. ESA or JAXA) from a given file or dataset.

Source code in earthcarekit/utils/read/product/file_info/agency.py
@classmethod
def from_input(cls, input: str | xr.Dataset) -> "FileAgency":
    """Infers the EarthCARE product agency (i.e. ESA or JAXA) from a given file or dataset."""
    if isinstance(input, str):
        try:
            return cls[input.upper()]
        except AttributeError:
            pass
        except KeyError:
            pass
        try:
            return cls(input.upper())
        except ValueError:
            pass

    return get_file_agency(input)

FileLatency

Bases: FileInfoEnum

Source code in earthcarekit/utils/read/product/file_info/latency.py
class FileLatency(FileInfoEnum):
    NEAR_REAL_TIME = "N"
    OFFLINE = "O"
    NOT_APPLICABLE = "X"

    @classmethod
    def from_input(cls, input: str | xr.Dataset) -> "FileLatency":
        """Infers the EarthCARE product latency indicator (i.e. N for Near-real time, O for Offline, X for not applicable) from a given name, file or dataset."""
        if isinstance(input, str):
            try:
                return cls[input.upper()]
            except AttributeError:
                pass
            except KeyError:
                pass
            try:
                return cls(input.upper())
            except ValueError:
                pass

        return get_file_latency(input)

from_input classmethod

from_input(input)

Infers the EarthCARE product latency indicator (i.e. N for Near-real time, O for Offline, X for not applicable) from a given name, file or dataset.

Source code in earthcarekit/utils/read/product/file_info/latency.py
@classmethod
def from_input(cls, input: str | xr.Dataset) -> "FileLatency":
    """Infers the EarthCARE product latency indicator (i.e. N for Near-real time, O for Offline, X for not applicable) from a given name, file or dataset."""
    if isinstance(input, str):
        try:
            return cls[input.upper()]
        except AttributeError:
            pass
        except KeyError:
            pass
        try:
            return cls(input.upper())
        except ValueError:
            pass

    return get_file_latency(input)

FileType

Bases: FileInfoEnum

Source code in earthcarekit/utils/read/product/file_info/type.py
class FileType(FileInfoEnum):
    # Level 1
    ATL_NOM_1B = "ATL_NOM_1B"
    ATL_DCC_1B = "ATL_DCC_1B"
    ATL_CSC_1B = "ATL_CSC_1B"
    ATL_FSC_1B = "ATL_FSC_1B"
    MSI_NOM_1B = "MSI_NOM_1B"
    MSI_BBS_1B = "MSI_BBS_1B"
    MSI_SD1_1B = "MSI_SD1_1B"
    MSI_SD2_1B = "MSI_SD2_1B"
    MSI_RGR_1C = "MSI_RGR_1C"
    BBR_NOM_1B = "BBR_NOM_1B"
    BBR_SNG_1B = "BBR_SNG_1B"
    BBR_SOL_1B = "BBR_SOL_1B"
    BBR_LIN_1B = "BBR_LIN_1B"
    CPR_NOM_1B = "CPR_NOM_1B"  # JAXA product
    # Level 2a
    ATL_FM__2A = "ATL_FM__2A"
    ATL_AER_2A = "ATL_AER_2A"
    ATL_ICE_2A = "ATL_ICE_2A"
    ATL_TC__2A = "ATL_TC__2A"
    ATL_EBD_2A = "ATL_EBD_2A"
    ATL_CTH_2A = "ATL_CTH_2A"
    ATL_ALD_2A = "ATL_ALD_2A"
    MSI_CM__2A = "MSI_CM__2A"
    MSI_COP_2A = "MSI_COP_2A"
    MSI_AOT_2A = "MSI_AOT_2A"
    CPR_FMR_2A = "CPR_FMR_2A"
    CPR_CD__2A = "CPR_CD__2A"
    CPR_TC__2A = "CPR_TC__2A"
    CPR_CLD_2A = "CPR_CLD_2A"
    CPR_APC_2A = "CPR_APC_2A"
    ATL_CLA_2A = "ATL_CLA_2A"  # JAXA product
    MSI_CLP_2A = "MSI_CLP_2A"  # JAXA product
    CPR_ECO_2A = "CPR_ECO_2A"  # JAXA product
    CPR_CLP_2A = "CPR_CLP_2A"  # JAXA product
    # Level 2b
    AM__MO__2B = "AM__MO__2B"
    AM__CTH_2B = "AM__CTH_2B"
    AM__ACD_2B = "AM__ACD_2B"
    AC__TC__2B = "AC__TC__2B"
    BM__RAD_2B = "BM__RAD_2B"
    BMA_FLX_2B = "BMA_FLX_2B"
    ACM_CAP_2B = "ACM_CAP_2B"
    ACM_COM_2B = "ACM_COM_2B"
    ACM_RT__2B = "ACM_RT__2B"
    ALL_DF__2B = "ALL_DF__2B"
    ALL_3D__2B = "ALL_3D__2B"
    AC__CLP_2B = "AC__CLP_2B"  # JAXA product
    ACM_CLP_2B = "ACM_CLP_2B"  # JAXA product
    ALL_RAD_2B = "ALL_RAD_2B"  # JAXA product
    # Auxiliary data
    AUX_MET_1D = "AUX_MET_1D"
    AUX_JSG_1D = "AUX_JSG_1D"
    # Orbit data
    MPL_ORBSCT = "MPL_ORBSCT"
    AUX_ORBPRE = "AUX_ORBPRE"
    AUX_ORBRES = "AUX_ORBRES"

    @classmethod
    def from_input(cls, input: str | xr.Dataset) -> "FileType":
        """Infers the EarthCARE product type from a given file or dataset."""
        if isinstance(input, str):
            try:
                return cls[format_file_type_string(input)]
            except AttributeError:
                pass
            except KeyError:
                pass
            try:
                return cls(format_file_type_string(input))
            except ValueError:
                pass
            except KeyError:
                pass

        return get_file_type(input)

    @classmethod
    def list(cls):
        return list(map(lambda c: c.value, cls))

    def to_shorthand(self, with_dash: bool = False):
        if with_dash:
            return _short_hand_map[self.value]
        else:
            return _short_hand_map[self.value].replace("-", "")

from_input classmethod

from_input(input)

Infers the EarthCARE product type from a given file or dataset.

Source code in earthcarekit/utils/read/product/file_info/type.py
@classmethod
def from_input(cls, input: str | xr.Dataset) -> "FileType":
    """Infers the EarthCARE product type from a given file or dataset."""
    if isinstance(input, str):
        try:
            return cls[format_file_type_string(input)]
        except AttributeError:
            pass
        except KeyError:
            pass
        try:
            return cls(format_file_type_string(input))
        except ValueError:
            pass
        except KeyError:
            pass

    return get_file_type(input)

ProductInfo dataclass

Class storing all info gathered from a EarthCARE product's file path.

Source code in earthcarekit/utils/read/product/file_info/product_info.py
@dataclass
class ProductInfo:
    """Class storing all info gathered from a EarthCARE product's file path."""

    mission_id: FileMissionID
    agency: FileAgency
    latency: FileLatency
    baseline: str
    file_type: FileType
    start_sensing_time: pd.Timestamp
    start_processing_time: pd.Timestamp
    orbit_number: int
    frame_id: str
    orbit_and_frame: str
    name: str
    filepath: str
    hdr_filepath: str

    def to_dict(self) -> dict:
        """Returns data stored in `ProductInfo` as a `dict`."""
        return asdict(self)

    def to_dataframe(self) -> "ProductDataFrame":
        return ProductDataFrame([self])

to_dict

to_dict()

Returns data stored in ProductInfo as a dict.

Source code in earthcarekit/utils/read/product/file_info/product_info.py
def to_dict(self) -> dict:
    """Returns data stored in `ProductInfo` as a `dict`."""
    return asdict(self)

add_depol_ratio

add_depol_ratio(
    ds_anom,
    rolling_w=20,
    near_zero_tolerance=2e-07,
    smooth=True,
    skip_height_above_elevation=300,
    depol_ratio_var="depol_ratio",
    cpol_cleaned_var="cpol_cleaned_for_depol_calculation",
    xpol_cleaned_var="xpol_cleaned_for_depol_calculation",
    depol_ratio_from_means_var="depol_ratio_from_means",
    cpol_var="mie_attenuated_backscatter",
    xpol_var="crosspolar_attenuated_backscatter",
    elevation_var=ELEVATION_VAR,
    height_var=HEIGHT_VAR,
    height_dim=VERTICAL_DIM,
)

Compute depolarization ratio (DPOL = XPOL/CPOL) from attenuated backscatter signals.

This function derives the depolarization ratio from cross-polarized (XPOL) and co-polarized (CPOL) attenuated backscatter signals. Signals below the surface are masked, by default with a vertical margin on 300 meters above elevation to remove potencial surface retrun. Also, signals are smoothed (or "cleaned") with a rolling mean, and near-zero divisions are suppressed. Cleaned CPOL and XPOL signals are stored alongside DPOL, and a secondary single depol. profile calculated from mean profiles is added (i.e., mean(XPOL)/mean(CPOL)).

Parameters:

Name Type Description Default
ds_anom Dataset

ATL_NOM_1B dataset containing cross- and co-polar attenuated backscatter.

required
rolling_w int

Window size for rolling mean smoothing. Defaults to 20.

20
near_zero_tolerance float

Tolerance for masking near-zero CPOL (i.e., denominators). Defaults to 2e-7.

2e-07
smooth bool

Whether to apply rolling mean smoothing. Defaults to True.

True
skip_height_above_elevation int

Vertical margin above surface elevation to mask in meters. Defaults to 300.

300
depol_ratio_var str

Name for depol. ratio variable. Defaults to "depol_ratio".

'depol_ratio'
cpol_cleaned_var str

Name for cleaned co-polar variable. Defaults to "cpol_cleaned_for_depol_calculation".

'cpol_cleaned_for_depol_calculation'
xpol_cleaned_var str

Name for cleaned cross-polar variable. Defaults to "xpol_cleaned_for_depol_calculation".

'xpol_cleaned_for_depol_calculation'
depol_ratio_from_means_var str

Name for ratio from mean profiles. Defaults to "depol_ratio_from_means".

'depol_ratio_from_means'
cpol_var str

Input co-polar variable name. Defaults to "mie_attenuated_backscatter".

'mie_attenuated_backscatter'
xpol_var str

Input cross-polar variable name. Defaults to "crosspolar_attenuated_backscatter".

'crosspolar_attenuated_backscatter'
elevation_var str

Elevation variable name. Defaults to ELEVATION_VAR.

ELEVATION_VAR
height_var str

Height variable name. Defaults to HEIGHT_VAR.

HEIGHT_VAR
height_dim str

Height dimension name. Defaults to VERTICAL_DIM.

VERTICAL_DIM

Returns:

Type Description
Dataset

xr.Dataset: Dataset with added depol. ratio, cleaned CPOL/XPOL signals, and depol. ratio from mean profiles.

Source code in earthcarekit/utils/read/product/level1/atl_nom_1b.py
def add_depol_ratio(
    ds_anom: xr.Dataset,
    rolling_w: int = 20,
    near_zero_tolerance: float = 2e-7,
    smooth: bool = True,
    skip_height_above_elevation: int = 300,
    depol_ratio_var: str = "depol_ratio",
    cpol_cleaned_var: str = "cpol_cleaned_for_depol_calculation",
    xpol_cleaned_var: str = "xpol_cleaned_for_depol_calculation",
    depol_ratio_from_means_var: str = "depol_ratio_from_means",
    cpol_var: str = "mie_attenuated_backscatter",
    xpol_var: str = "crosspolar_attenuated_backscatter",
    elevation_var: str = ELEVATION_VAR,
    height_var: str = HEIGHT_VAR,
    height_dim: str = VERTICAL_DIM,
) -> xr.Dataset:
    """
    Compute depolarization ratio (`DPOL` = `XPOL`/`CPOL`) from attenuated backscatter signals.

    This function derives the depolarization ratio from cross-polarized (`XPOL`) and
    co-polarized (`CPOL`) attenuated backscatter signals. Signals below the surface
    are masked, by default with a vertical margin on 300 meters above elevation to remove
    potencial surface retrun. Also, signals are smoothed (or "cleaned") with a rolling mean,
    and near-zero divisions are suppressed. Cleaned `CPOL` and `XPOL` signals are stored alongside `DPOL`,
    and a secondary single depol. profile calculated from mean profiles is added (i.e., mean(`XPOL`)/mean(`CPOL`)).

    Args:
        ds_anom (xr.Dataset): ATL_NOM_1B dataset containing cross- and co-polar attenuated backscatter.
        rolling_w (int, optional): Window size for rolling mean smoothing. Defaults to 20.
        near_zero_tolerance (float, optional): Tolerance for masking near-zero `CPOL` (i.e., denominators). Defaults to 2e-7.
        smooth (bool, optional): Whether to apply rolling mean smoothing. Defaults to True.
        skip_height_above_elevation (int, optional): Vertical margin above surface elevation to mask in meters. Defaults to 300.
        depol_ratio_var (str, optional): Name for depol. ratio variable. Defaults to "depol_ratio".
        cpol_cleaned_var (str, optional): Name for cleaned co-polar variable. Defaults to "cpol_cleaned_for_depol_calculation".
        xpol_cleaned_var (str, optional): Name for cleaned cross-polar variable. Defaults to "xpol_cleaned_for_depol_calculation".
        depol_ratio_from_means_var (str, optional): Name for ratio from mean profiles. Defaults to "depol_ratio_from_means".
        cpol_var (str, optional): Input co-polar variable name. Defaults to "mie_attenuated_backscatter".
        xpol_var (str, optional): Input cross-polar variable name. Defaults to "crosspolar_attenuated_backscatter".
        elevation_var (str, optional): Elevation variable name. Defaults to ELEVATION_VAR.
        height_var (str, optional): Height variable name. Defaults to HEIGHT_VAR.
        height_dim (str, optional): Height dimension name. Defaults to VERTICAL_DIM.

    Returns:
        xr.Dataset: Dataset with added depol. ratio, cleaned `CPOL`/`XPOL` signals,
            and depol. ratio from mean profiles.
    """
    cpol_da = ds_anom[cpol_var].copy()
    xpol_da = ds_anom[xpol_var].copy()
    ds_anom[depol_ratio_var] = xpol_da / cpol_da
    rename_var_info(
        ds_anom,
        depol_ratio_var,
        name=depol_ratio_var,
        long_name="Depol. ratio from cross- and co-polar atten. part. bsc.",
        units="",
    )

    elevation = (
        ds_anom[elevation_var].values.copy()[:, np.newaxis]
        + skip_height_above_elevation
    )
    mask_surface = ds_anom[height_var].values[0].copy() < elevation

    xpol = ds_anom[xpol_var].values
    cpol = ds_anom[cpol_var].values
    xpol[mask_surface] = np.nan
    cpol[mask_surface] = np.nan
    if smooth:
        xpol = rolling_mean_2d(xpol, rolling_w, axis=0)
        cpol = rolling_mean_2d(cpol, rolling_w, axis=0)
        near_zero_mask = np.isclose(cpol, 0, atol=near_zero_tolerance)
        ds_anom[depol_ratio_var].values = xpol / cpol
        ds_anom[depol_ratio_var].values[near_zero_mask] = np.nan
    else:
        ds_anom[depol_ratio_var].values = xpol / cpol

    xpol[near_zero_mask] = np.nan
    cpol[near_zero_mask] = np.nan

    ds_anom[cpol_cleaned_var] = ds_anom[cpol_var].copy()
    ds_anom[cpol_cleaned_var].values = cpol

    ds_anom[xpol_cleaned_var] = ds_anom[xpol_var].copy()
    ds_anom[xpol_cleaned_var].values = xpol

    dpol_mean = nan_mean(xpol, axis=0) / nan_mean(cpol, axis=0)
    ds_anom[depol_ratio_from_means_var] = xr.DataArray(
        data=dpol_mean,
        dims=[height_dim],
        attrs=dict(
            long_name="Depol. ratio from cross- and co-polar atten. part. bsc.",
            units="",
        ),
    )

    return ds_anom

get_product_info

get_product_info(filepath, warn=False, must_exist=True)

Gather all info contained in the EarthCARE product's file path.

Source code in earthcarekit/utils/read/product/file_info/product_info.py
def get_product_info(
    filepath: str,
    warn: bool = False,
    must_exist: bool = True,
) -> ProductInfo:
    """Gather all info contained in the EarthCARE product's file path."""
    if _is_url(filepath):
        filepath = _get_path_from_url(filepath)
        must_exist = False

    filepath = os.path.abspath(filepath)

    if must_exist and not os.path.exists(filepath):
        raise FileNotFoundError(f"File does not exist: {filepath}")

    if must_exist:
        pattern = re.compile(
            r".*ECA_[EJ][XNO][A-Z]{2}_..._..._.._\d{8}T\d{6}Z_\d{8}T\d{6}Z_\d{5}[ABCDEFGH]\.h5"
        )
    else:
        pattern = re.compile(
            r".*ECA_[EJ][XNO][A-Z]{2}_..._..._.._\d{8}T\d{6}Z_\d{8}T\d{6}Z_\d{5}[ABCDEFGH].*"
        )
    is_match = bool(pattern.fullmatch(filepath))

    if not is_match:
        pattern_orbit_file = re.compile(
            r".*ECA_[EJ][XNO][A-Z]{2}_..._......_\d{8}T\d{6}Z_\d{8}T\d{6}Z_\d{4}.*"
        )
        is_match = bool(pattern_orbit_file.fullmatch(filepath))

        if not is_match:
            raise ValueError(f"EarthCARE product has invalid file name: {filepath}")

        filename = os.path.basename(filepath).removesuffix(".h5")
        mission_id = FileMissionID.from_input(filename[0:3])
        agency = FileAgency.from_input(filename[4])
        latency = FileLatency.from_input(filename[5])
        baseline = filename[6:8]
        file_type = FileType.from_input(filename[9:19])
        start_sensing_time: pd.Timestamp
        try:
            start_sensing_time = pd.Timestamp(filename[20:35])
        except ValueError as e:
            start_sensing_time = pd.NaT  # type: ignore
        start_processing_time: pd.Timestamp
        try:
            start_processing_time = pd.Timestamp(filename[37:52])
        except ValueError as e:
            start_processing_time = pd.NaT  # type: ignore

        info = ProductInfo(
            mission_id=mission_id,
            agency=agency,
            latency=latency,
            baseline=baseline,
            file_type=file_type,
            start_sensing_time=start_sensing_time,
            start_processing_time=start_processing_time,
            orbit_number=0,
            frame_id="",
            orbit_and_frame="",
            name=filename,
            filepath=filepath,
            hdr_filepath="",
        )

        return info

    product_filepath = filepath.removesuffix(".h5").removesuffix(".HDR") + ".h5"
    if not os.path.exists(product_filepath):
        if warn:
            msg = f"Missing product file: {product_filepath}"
            warnings.warn(msg)
        product_filepath = ""

    hdr_filepath = filepath.removesuffix(".h5").removesuffix(".HDR") + ".HDR"
    if not os.path.exists(hdr_filepath):
        if warn:
            msg = f"Missing product header file: {hdr_filepath}"
            warnings.warn(msg)
        hdr_filepath = ""

    filename = os.path.basename(filepath).removesuffix(".h5").removesuffix(".HDR")
    mission_id = FileMissionID.from_input(filename[0:3])
    agency = FileAgency.from_input(filename[4])
    latency = FileLatency.from_input(filename[5])
    baseline = filename[6:8]
    file_type = FileType.from_input(filename[9:19])
    start_sensing_time = pd.Timestamp(filename[20:35])
    start_processing_time = pd.Timestamp(filename[37:52])
    orbit_number = int(filename[54:59])
    frame_id = filename[59]
    orbit_and_frame = filename[54:60]

    info = ProductInfo(
        mission_id=mission_id,
        agency=agency,
        latency=latency,
        baseline=baseline,
        file_type=file_type,
        start_sensing_time=start_sensing_time,
        start_processing_time=start_processing_time,
        orbit_number=orbit_number,
        frame_id=frame_id,
        orbit_and_frame=orbit_and_frame,
        name=filename,
        filepath=product_filepath,
        hdr_filepath=hdr_filepath,
    )

    return info

get_product_infos

get_product_infos(filepaths, **kwargs)

Extracts product metadata from EarthCARE product file paths (e.g. file_type, orbit_number, frame_id, baseline, ...).

Parameters:

Name Type Description Default
filepaths str | list[str] | NDArray | DataFrame | Dataset

Input sources for EarthCARE product files. Can be one of - str -> A single file path. - list[str] or numpy.ndarray -> A list or array of file paths. - pandas.DataFrame -> Must contain a 'filepath' column. - xarray.Dataset -> Must have encoding with attribute 'source' (str) or 'sources' (list[str]).

required
**kwargs

Additional arguments passed to get_product_info().

{}

Returns:

Name Type Description
ProductDataFrame ProductDataFrame

A dataframe containing extracted product information.

Source code in earthcarekit/utils/read/product/file_info/product_info.py
def get_product_infos(
    filepaths: str | list[str] | NDArray | pd.DataFrame | xr.Dataset,
    **kwargs,
) -> "ProductDataFrame":
    """
    Extracts product metadata from EarthCARE product file paths (e.g. file_type, orbit_number, frame_id, baseline, ...).

    Args:
        filepaths:
            Input sources for EarthCARE product files. Can be one of
            - `str` -> A single file path.
            - `list[str]` or `numpy.ndarray` -> A list or array of file paths.
            - `pandas.DataFrame` -> Must contain a 'filepath' column.
            - `xarray.Dataset` -> Must have encoding with attribute 'source' (`str`) or 'sources' (`list[str]`).
        **kwargs: Additional arguments passed to `get_product_info()`.

    Returns:
        ProductDataFrame: A dataframe containing extracted product information.
    """
    _filepaths: list[str] | NDArray
    if isinstance(filepaths, (str, np.str_)):
        _filepaths = [str(filepaths)]
    elif isinstance(filepaths, xr.Dataset):
        ds: xr.Dataset = filepaths
        if not hasattr(ds, "encoding"):
            raise ValueError(f"Dataset missing encoding attribute.")
        elif "source" in ds.encoding:
            _filepaths = [ds.encoding["source"]]
        elif "sources" in ds.encoding:
            _filepaths = ds.encoding["sources"]
        else:
            raise ValueError(f"Dataset encoding does not contain source or sources.")
    elif isinstance(filepaths, pd.DataFrame):
        df: pd.DataFrame = filepaths
        if "filepath" in df:
            _filepaths = df["filepath"].to_numpy()
        else:
            raise ValueError(
                f"""Given dataframe does not contain a column of file paths. A valid file path column name is "filepath"."""
            )
    else:
        _filepaths = filepaths

    infos = []
    for filepath in _filepaths:
        try:
            infos.append(get_product_info(filepath, **kwargs).to_dict())
        except ValueError as e:
            continue
    pdf = ProductDataFrame(infos)
    pdf.validate_columns()
    return pdf

read_any

read_any(input, **kwargs)

Reads various input types and returns an xarray.Dataset.

This function can read
  • EarthCARE product files (.h5)
  • NetCDF files (.nc)
  • Manually processed PollyXT output files (.txt)

Parameters:

Name Type Description Default
input str | Dataset

File path or existing Dataset.

required
**kwargs

Additional keyword arguments for specific readers.

{}

Returns:

Type Description
Dataset

xr.Dataset: Opened dataset.

Raises:

Type Description
ValueError

If the file type is not supported.

TypeError

If the input type is invalid.

Source code in earthcarekit/utils/read/_read_any.py
def read_any(input: str | xr.Dataset, **kwargs) -> xr.Dataset:
    """Reads various input types and returns an `xarray.Dataset`.

    This function can read:
        - EarthCARE product files (`.h5`)
        - NetCDF files (`.nc`)
        - Manually processed PollyXT output files (`.txt`)

    Args:
        input (str | xr.Dataset): File path or existing Dataset.
        **kwargs: Additional keyword arguments for specific readers.

    Returns:
        xr.Dataset: Opened dataset.

    Raises:
        ValueError: If the file type is not supported.
        TypeError: If the input type is invalid.
    """
    if isinstance(input, xr.Dataset):
        return input
    elif isinstance(input, str):
        filepath = input

        if is_earthcare_product(filepath=filepath):
            return read_product(filepath, **kwargs)

        filename = os.path.basename(filepath)
        _, ext = os.path.splitext(filename)
        if ext.lower() == ".txt":
            return read_polly(filepath)
        elif ext.lower() == ".nc":
            return read_nc(filepath, **kwargs)

        raise ValueError(f"Reading of file not supported: <{input}>")
    raise TypeError(f"Invalid type '{type(input).__name__}' for input.")

read_header_data

read_header_data(source: str) -> xr.Dataset
read_header_data(source: Dataset) -> xr.Dataset
read_header_data(source)

Opens the product header groups of a EarthCARE file as a xarray.Dataset.

Source code in earthcarekit/utils/read/product/header_group.py
def read_header_data(source: str | xr.Dataset) -> xr.Dataset:
    """Opens the product header groups of a EarthCARE file as a `xarray.Dataset`."""
    if isinstance(source, str):
        filepath = source
    elif isinstance(source, xr.Dataset):
        filepath = source.encoding.get("source", None)
        if filepath is None:
            raise ValueError(f"Dataset missing source attribute")
    else:
        raise TypeError("Expected 'str' or 'xarray.Dataset'")

    groups = xr.open_groups(filepath)
    header_groups = {n: g for n, g in groups.items() if "HeaderData" in n}

    # Rename duplicate vars

    all_vars = {}
    header_datasets = []
    for i, (group_name, ds) in enumerate(header_groups.items()):
        ds_new = ds.copy()
        for var in ds.data_vars:
            if var in all_vars:
                new_name = f"{group_name.split('/')[-1]}_{var}"
                ds_new = ds_new.rename({var: new_name})
            else:
                all_vars[var] = True
        header_datasets.append(ds_new)

    ds = xr.merge(header_datasets)

    ds = _convert_all_fill_values_to_nan(ds)

    # Convert timestamps to numpy datetime
    for var in [
        "Creation_Date",
        "Validity_Start",
        "Validity_Stop",
        "ANXTime",
        "frameStartTime",
        "frameStopTime",
        "processingStartTime",
        "processingStopTime",
        "sensingStartTime",
        "sensingStopTime",
        "stateVectorTime",
    ]:
        if var in ds:
            raw = ds[var].values
            formatted = np.char.replace(raw, "UTC=", "")
            ds[var].values = formatted.astype("datetime64[ns]")

    # Ensure that strings are correctly decoded
    for var in ["frameID"]:
        if var in ds:
            ds = convert_scalar_var_to_str(ds, var)

    # Remove dimensions of size == 1
    ds = ds.squeeze()

    return ds

read_nc

read_nc(input, modify=True, in_memory=False, **kwargs)

Returns an xarray.Dataset from a Dataset or NetCDF file path, optionally loaded into memory.

Parameters:

Name Type Description Default
input Dataset or str

Path to a NetCDF file. If a already opened xarray.Dataset object is passed, it is returned as is.

required
modify bool

If True, default modifications to the opened dataset will be applied (e.g., converting heights in Polly data from height a.g.l. to height above mean sea level).

True
in_memory bool

If True, ensures the dataset is fully loaded into memory. Defaults to False.

False
**kwargs

Key-word arguments passed to xarray.open_dataset().

{}

Returns:

Type Description
Dataset

xarray.Dataset: The resulting dataset.

Raises:

Type Description
TypeError

If input is not a Dataset or string.

Source code in earthcarekit/utils/read/_read_nc.py
def read_nc(
    input: str | xr.Dataset,
    modify: bool = True,
    in_memory: bool = False,
    **kwargs,
) -> xr.Dataset:
    """Returns an `xarray.Dataset` from a Dataset or NetCDF file path, optionally loaded into memory.

    Args:
        input (xarray.Dataset or str): Path to a NetCDF file. If a already opened `xarray.Dataset` object is passed, it is returned as is.
        modify (bool): If True, default modifications to the opened dataset will be applied
            (e.g., converting heights in Polly data from height a.g.l. to height above mean sea level).
        in_memory (bool, optional): If True, ensures the dataset is fully loaded into memory. Defaults to False.
        **kwargs: Key-word arguments passed to `xarray.open_dataset()`.

    Returns:
        xarray.Dataset: The resulting dataset.

    Raises:
        TypeError: If input is not a Dataset or string.
    """
    ds: xr.Dataset
    if isinstance(input, xr.Dataset):
        ds = input
    elif isinstance(input, str):
        if in_memory:
            with _read_nc(input, modify=modify, **kwargs) as ds:
                ds = ds.load()
        else:
            ds = _read_nc(input, modify=modify, **kwargs)
    else:
        raise TypeError(
            f"Invalid input type! Expecting a opened NetCDF dataset (xarray.Dataset) or a path to a NetCDF file."
        )
    return ds

read_polly

read_polly(input)

Reads manually processed PollyXT output text files as xarray.Dataset or returns an already open one.

Source code in earthcarekit/utils/read/_read_polly.py
def read_polly(input: str | xr.Dataset) -> xr.Dataset:
    """Reads manually processed PollyXT output text files as `xarray.Dataset` or returns an already open one."""

    if isinstance(input, xr.Dataset):
        return input

    with open(input, "r", encoding="utf-8", errors="ignore") as f:
        df = pd.read_csv(f, sep="\t")

    new_columns = [_parse_column_name(c) for c in df.columns]
    new_column_names = [c.name for c in new_columns]
    new_column_names = _make_column_names_unique(new_column_names)
    df.columns = pd.Index(new_column_names)

    ds = xr.Dataset.from_dataframe(df)
    ds = ds.assign_coords(index=ds.height.values)
    ds = ds.rename({"index": "vertical"})
    if "time" not in ds:
        ds = ds.assign({"time": np.datetime64("1970-01-01T00:00:00.000", "ms")})

    vars_order = ["time"] + [v for v in ds.data_vars if v != "time"]
    ds = ds[vars_order]

    for c in new_columns:
        if c.units == "km":
            ds[c.name].values = ds[c.name].values * 1e3
            c.units = c.units.replace("k", "")
        elif c.units in ["Mm-1 sr-1", "Mm-1", "Msr-1"]:
            ds[c.name].values = ds[c.name].values / 1e6
            c.units = c.units.replace("M", "")

        ds[c.name] = ds[c.name].assign_attrs(
            dict(
                long_name=c.long_name,
                units=c.units,
            )
        )
    return ds

read_product

read_product(
    input,
    trim_to_frame=True,
    modify=DEFAULT_READ_EC_PRODUCT_MODIFY,
    header=DEFAULT_READ_EC_PRODUCT_HEADER,
    meta=DEFAULT_READ_EC_PRODUCT_META,
    in_memory=False,
    **kwargs
)

Returns an xarray.Dataset from a Dataset or EarthCARE file path, optionally loaded into memory.

Parameters:

Name Type Description Default
input str or Dataset

Path to a EarthCARE file. If a xarray.Dataset is given it will be returned as is.

required
trim_to_frame bool

Whether to trim the dataset to latitude frame bounds. Defaults to True.

True
modify bool

If True, default modifications to the opened dataset will be applied (e.g., renaming dimension corresponding to height to "vertical"). Defaults to True.

DEFAULT_READ_EC_PRODUCT_MODIFY
header bool

If True, all header data will be included in the dataframe. Defaults to False.

DEFAULT_READ_EC_PRODUCT_HEADER
meta bool

If True, select meta data from header (like orbit number and frame ID) will be included in the dataframe. Defaults to True.

DEFAULT_READ_EC_PRODUCT_META
in_memory bool

If True, ensures the dataset is fully loaded into memory. Defaults to False.

False

Returns:

Type Description
Dataset

xarray.Dataset: The resulting dataset.

Raises:

Type Description
TypeError

If input is not a Dataset or string.

Source code in earthcarekit/utils/read/product/_generic.py
def read_product(
    input: str | Dataset,
    trim_to_frame: bool = True,
    modify: bool = DEFAULT_READ_EC_PRODUCT_MODIFY,
    header: bool = DEFAULT_READ_EC_PRODUCT_HEADER,
    meta: bool = DEFAULT_READ_EC_PRODUCT_META,
    in_memory: bool = False,
    **kwargs,
) -> Dataset:
    """Returns an `xarray.Dataset` from a Dataset or EarthCARE file path, optionally loaded into memory.

    Args:
        input (str or xarray.Dataset): Path to a EarthCARE file. If a `xarray.Dataset` is given it will be returned as is.
        trim_to_frame (bool, optional): Whether to trim the dataset to latitude frame bounds. Defaults to True.
        modify (bool): If True, default modifications to the opened dataset will be applied
            (e.g., renaming dimension corresponding to height to "vertical"). Defaults to True.
        header (bool): If True, all header data will be included in the dataframe. Defaults to False.
        meta (bool): If True, select meta data from header (like orbit number and frame ID) will be included in the dataframe. Defaults to True.
        in_memory (bool, optional): If True, ensures the dataset is fully loaded into memory. Defaults to False.

    Returns:
        xarray.Dataset: The resulting dataset.

    Raises:
        TypeError: If input is not a Dataset or string.
    """
    ds: Dataset
    if isinstance(input, Dataset):
        ds = input
    elif isinstance(input, str):
        if in_memory:
            with _read_product(
                filepath=input,
                trim_to_frame=trim_to_frame,
                modify=modify,
                header=header,
                meta=meta,
                **kwargs,
            ) as ds:
                ds = ds.load()
        else:
            ds = _read_product(
                filepath=input,
                trim_to_frame=trim_to_frame,
                modify=modify,
                header=header,
                meta=meta,
                **kwargs,
            )
    else:
        raise TypeError(
            f"Invalid input type! Expecting a opened EarthCARE dataset (xarray.Dataset) or a path to a EarthCARE product."
        )
    return ds

read_products

read_products(
    filepaths,
    zoom_at=None,
    along_track_dim=ALONG_TRACK_DIM,
    func=None,
    func_inputs=None,
    max_num_files=8,
    coarsen=True,
)

Read and concatenate a sequence of EarthCARE frames into a single xarray Dataset.

By default, the dataset is coarsened according to the number of input frames (e.g., combining 3 products averages every 3 profiles, so the along-track dimension remains comparable to a single product). Optionally applies a processing function to each frame and zooms in on a specific region (defined by zoom_at) without coarsening. Coarsening can also be turned of but might case memory issues.

Parameters:

Name Type Description Default
filepaths Sequence[str] or DataFrame

EarthCARE product file paths as a list or a DataFrame with metadata including filepath, orbit_number, and frame_id.

required
zoom_at float

If set, selects only a zoomed-in portion of the frames around this fractional index. Defaults to None.

None
along_track_dim str

Name of the dimension to concatenate along. Defaults to ALONG_TRACK_DIM.

ALONG_TRACK_DIM
func Callable

Function to apply to each frame after loading. Defaults to None.

None
func_inputs Sequence[dict]

Optional per-frame arguments to pass to func. Defaults to None.

None
max_num_files int

Max. number of files that are allowed to be loaded at once. A ValueError is raised if above. Defaults to 8 (e.g., full orbit).

8
coarsen bool

If Ture, read data sets are coarened depending on the number given of files. Only aplicable when not zooming. Defaults to Ture.

True

Returns:

Name Type Description
Dataset Dataset

Concatenated dataset with all frames along along_track_dim.

Source code in earthcarekit/utils/read/product/_concat.py
def read_products(
    filepaths: Sequence[str] | pd.DataFrame,
    zoom_at: float | None = None,
    along_track_dim: str = ALONG_TRACK_DIM,
    func: Callable | None = None,
    func_inputs: Sequence[dict] | None = None,
    max_num_files: int = 8,
    coarsen: bool = True,
) -> Dataset:
    """Read and concatenate a sequence of EarthCARE frames into a single xarray Dataset.

    By default, the dataset is coarsened according to the number of input frames (e.g.,
    combining 3 products averages every 3 profiles, so the along-track dimension remains
    comparable to a single product). Optionally applies a processing function to each
    frame and zooms in on a specific region (defined by `zoom_at`) without coarsening.
    Coarsening can also be turned of but might case memory issues.

    Args:
        filepaths (Sequence[str] or pandas.DataFrame):
            EarthCARE product file paths as a list or a DataFrame with metadata
            including `filepath`, `orbit_number`, and `frame_id`.
        zoom_at (float, optional):
            If set, selects only a zoomed-in portion of the frames around this
            fractional index. Defaults to None.
        along_track_dim (str, optional):
            Name of the dimension to concatenate along. Defaults to ALONG_TRACK_DIM.
        func (Callable, optional):
            Function to apply to each frame after loading. Defaults to None.
        func_inputs (Sequence[dict], optional):
            Optional per-frame arguments to pass to `func`. Defaults to None.
        max_num_files (int, optional):
            Max. number of files that are allowed to be loaded at once.
            A `ValueError` is raised if above. Defaults to 8 (e.g., full orbit).
        coarsen (bool, optional):
            If Ture, read data sets are coarened depending on the number given of files.
            Only aplicable when not zooming. Defaults to Ture.

    Returns:
        Dataset: Concatenated dataset with all frames along `along_track_dim`.
    """
    if isinstance(filepaths, str):
        filepaths = [filepaths]
    elif isinstance(filepaths, pd.DataFrame):
        df = filepaths.sort_values(by="filepath")
        filepaths = df["filepath"].tolist()
    else:
        df = ProductDataFrame.from_files(list(filepaths)).sort_values(by="filepath")
        df.validate_columns()
        filepaths = df["filepath"].tolist()

    if len(filepaths) == 0:
        raise ValueError(f"Given sequence of product files paths is empty")
    elif len(filepaths) == 1:
        warnings.warn(f"Can not concatenate frames since only one file path was given")
        return read_product(filepaths[0])
    elif len(filepaths) > max_num_files:
        raise ValueError(
            f"Too many files provided: {len(filepaths)} (currently maximum allowed is {max_num_files}). "
            "Please reduce the number of files or increase the allowed amount by setting the argument max_num_files."
        )
    elif len(filepaths) > 8:
        warnings.warn(
            f"You provided {len(filepaths)} files, which is more than one full orbit (8 files). "
            "Processing might take longer than usual."
        )

    # # Construct filename suffix from orbit/frame numbers
    # orbit_start = str(df["orbit_number"].iloc[0]).zfill(5)
    # orbit_end = str(df["orbit_number"].iloc[-1]).zfill(5)
    # frame_start = df["frame_id"].iloc[0]
    # frame_end = df["frame_id"].iloc[-1]

    # if orbit_start == orbit_end:
    #     oaf_string = (
    #         f"{orbit_start}{frame_start}"
    #         if frame_start == frame_end
    #         else f"{orbit_start}{frame_start}-{frame_end}"
    #     )
    # else:
    #     oaf_string = f"{orbit_start}{frame_start}-{orbit_end}{frame_end}"

    def apply_func(ds: Dataset, i: int) -> Dataset:
        """Apply a processing function to a dataset if specified."""
        if func is None:
            return ds
        if func_inputs is None:
            return func(ds)
        if i < len(func_inputs):
            return func(ds, **func_inputs[i])
        raise IndexError("Too few function inputs provided")

    num_files = len(filepaths)
    ds: xr.Dataset | None = None

    if zoom_at is not None:
        # Zoomed read: select portions of two adjacent frames
        frame_indices = np.unique([int(np.floor(zoom_at)), int(np.ceil(zoom_at))])
        offset = zoom_at - frame_indices[0]
        filepaths = [filepaths[i] for i in frame_indices]

        for i, filepath in enumerate(filepaths):
            with read_product(filepath) as frame_ds:
                frame_ds = apply_func(frame_ds, frame_indices[i])

                # Preserve original dtypes
                original_dtypes = {v: frame_ds[v].dtype for v in frame_ds.variables}

                # Select relevant portion of the frame
                n = len(frame_ds[along_track_dim])
                sel_slice = (
                    slice(int(np.floor(n * offset)), n)
                    if i == 0
                    else slice(0, int(np.ceil(n * offset)))
                )
                frame_ds = frame_ds.sel({along_track_dim: sel_slice})

                # Restore dtypes
                for v, dtype in original_dtypes.items():
                    frame_ds[v] = frame_ds[v].astype(dtype)

                ds = (
                    frame_ds.copy()
                    if ds is None
                    else concat_datasets(
                        ds.copy(), frame_ds.copy(), dim=along_track_dim
                    )
                )

    else:
        # Full read and coarsen each frame
        for i, filepath in enumerate(filepaths):
            with read_product(filepath) as frame_ds:
                frame_ds = apply_func(frame_ds, i)

                if coarsen:
                    original_dtypes = {v: frame_ds[v].dtype for v in frame_ds.variables}

                    coarsen_dims = {along_track_dim: num_files}

                    # Circular mean for longitude
                    lon_coarse = (
                        frame_ds["longitude"]
                        .coarsen(coarsen_dims, boundary="trim")
                        .reduce(circular_mean_np)
                    )
                    _tmp_attrs = lon_coarse.attrs
                    lon_coarse.attrs = {}

                    # Regular mean for the rest
                    rest = (
                        frame_ds.drop_vars("longitude")
                        .coarsen(coarsen_dims, boundary="trim")
                        .mean()  # type: ignore
                    )

                    # Merge results
                    frame_ds = xr.merge([lon_coarse, rest])
                    frame_ds["longitude"].attrs = _tmp_attrs

                    for v, dtype in original_dtypes.items():
                        frame_ds[v] = frame_ds[v].astype(dtype)

                ds = (
                    frame_ds
                    if ds is None
                    else concat_datasets(ds, frame_ds, dim=along_track_dim)
                )

    # Set output file sources
    if isinstance(ds, Dataset):
        ds.encoding["sources"] = list(filepaths)
        return ds
    else:
        raise RuntimeError(f"Bad implementation")

read_science_data

read_science_data(filepath, agency=None, **kwargs)

Opens the science data of a EarthCARE file as a xarray.Dataset.

Source code in earthcarekit/utils/read/product/science_group.py
def read_science_data(
    filepath: str,
    agency: Union["FileAgency", None] = None,
    **kwargs,
) -> xr.Dataset:
    """Opens the science data of a EarthCARE file as a `xarray.Dataset`."""
    from .file_info.agency import (
        FileAgency,  # Imported inside function to avoid circular import error
    )

    if agency is None:
        agency = FileAgency.from_input(filepath)

    if agency == FileAgency.ESA:
        ds = xr.open_dataset(filepath, group="ScienceData", **kwargs)
    elif agency == FileAgency.JAXA:
        df_cpr_geo = xr.open_dataset(
            filepath, group="ScienceData/Geo", engine="h5netcdf", phony_dims="sort"
        )
        df_cpr_data = xr.open_dataset(
            filepath, group="ScienceData/Data", engine="h5netcdf", phony_dims="sort"
        )
        ds = xr.merge([df_cpr_data, df_cpr_geo])
        ds.encoding["source"] = df_cpr_data.encoding["source"]
    else:
        raise NotImplementedError()

    ds = _convert_all_fill_values_to_nan(ds)

    return ds

rebin_xmet_to_vertical_track

rebin_xmet_to_vertical_track(
    ds_xmet,
    ds_vert,
    vars=None,
    k=4,
    eps=1e-12,
    lat_var=TRACK_LAT_VAR,
    lon_var=TRACK_LON_VAR,
    time_var=TIME_VAR,
    height_var=HEIGHT_VAR,
    along_track_dim=ALONG_TRACK_DIM,
    height_dim=VERTICAL_DIM,
    xmet_lat_var="latitude",
    xmet_lon_var="longitude",
    xmet_height_var="geometrical_height",
    xmet_height_dim="height",
    xmet_horizontal_grid_dim="horizontal_grid",
)

Rebins variables from an AUX_MET_1D (XMET) dataset onto the vertical curtain track of given by another dataset (e.g. ATL_EBD_2A).

This function interpolates selected variables from ds_xmet onto a EarthCARE vertical track given in ds_vert, using quick horizontal kd-tree nearest-neighbor search with scipy.spatial.cKDTree followed by averaging the k-nearest vertical XMET profiles using inverse distance weighting. The resulting profiles are then interpolated in the vertical to match the height resolution of ds_vert.

Parameters:

Name Type Description Default
ds_xmet Dataset

The source XMET dataset from which vertical curtain along track will be interpolated.

required
ds_vert Dataset

The target dataset containing the vertical curtain track.

required
vars list[str] | None

List of variable names from ds_xmet to rebin. If None, all data variables are considered.

None
k int

Number of nearest horizontal neighbors to include in the kd-tree search. Defaults to 4.

4
eps float

Numerical threshold to avoid division by zero in distance calculations during the kd-tree search. Defaults to 1e-12.

1e-12
lat_var str

Name of the latitude variable in ds_vert. Defaults to TRACK_LAT_VAR.

TRACK_LAT_VAR
lon_var str

Name of the longitude variable in ds_vert. Defaults to TRACK_LON_VAR.

TRACK_LON_VAR
time_var str

Name of the time variable in ds_vert. Defaults to TIME_VAR.

TIME_VAR
height_var str

Name of the height variable in ds_vert. Defaults to HEIGHT_VAR.

HEIGHT_VAR
along_track_dim str

Name of the along-track dimension in ds_vert. Defaults to ALONG_TRACK_DIM.

ALONG_TRACK_DIM
height_dim str

Name of the vertical or height dimension in ds_vert. Defaults to VERTICAL_DIM.

VERTICAL_DIM
xmet_lat_var str

Name of the latitude variable in ds_xmet. Defaults to "latitude".

'latitude'
xmet_lon_var str

Name of the longitude variable in ds_xmet. Defaults to "longitude".

'longitude'
xmet_height_var str

Name of the height variable in ds_xmet. Defaults to "geometrical_height".

'geometrical_height'
xmet_height_dim str

Name of the vertical dimension in ds_xmet. Defaults to "height".

'height'
xmet_horizontal_grid_dim str

Name of the horizontal grid dimension in ds_xmet. Defaults to "horizontal_grid".

'horizontal_grid'

Returns:

Type Description
Dataset

xr.Dataset: A new dataset containing the selected XMET variables interpolated to the grid of the vertical curtain given in ds_vert. This new dataset has the same along-track and vertical dimensions as ds_vert.

Raises:

Type Description
KeyError

If any specified variable or coordinate name is not found in ds_xmet.

Source code in earthcarekit/utils/read/product/auxiliary/aux_met_1d.py
def rebin_xmet_to_vertical_track(
    ds_xmet: xr.Dataset,
    ds_vert: xr.Dataset,
    vars: list[str] | None = None,
    k: int = 4,
    eps: float = 1e-12,
    lat_var: str = TRACK_LAT_VAR,
    lon_var: str = TRACK_LON_VAR,
    time_var: str = TIME_VAR,
    height_var: str = HEIGHT_VAR,
    along_track_dim: str = ALONG_TRACK_DIM,
    height_dim: str = VERTICAL_DIM,
    xmet_lat_var: str = "latitude",
    xmet_lon_var: str = "longitude",
    xmet_height_var: str = "geometrical_height",
    xmet_height_dim: str = "height",
    xmet_horizontal_grid_dim: str = "horizontal_grid",
) -> xr.Dataset:
    """
    Rebins variables from an AUX_MET_1D (XMET) dataset onto the vertical curtain track of given by another dataset (e.g. ATL_EBD_2A).

    This function interpolates selected variables from `ds_xmet` onto a EarthCARE
    vertical track given in `ds_vert`, using quick horizontal kd-tree nearest-neighbor search with `scipy.spatial.cKDTree` followed
    by averaging the `k`-nearest vertical XMET profiles using inverse distance weighting. The resulting
    profiles are then interpolated in the vertical to match the height resolution of `ds_vert`.

    Args:
        ds_xmet (xr.Dataset): The source XMET dataset from which vertical curtain along track will be interpolated.
        ds_vert (xr.Dataset): The target dataset containing the vertical curtain track.
        vars (list[str] | None, optional): List of variable names from `ds_xmet` to rebin.
            If None, all data variables are considered.
        k (int, optional): Number of nearest horizontal neighbors to include in the kd-tree search.
            Defaults to 4.
        eps (float, optional): Numerical threshold to avoid division by zero in distance calculations during the kd-tree search.
            Defaults to 1e-12.
        lat_var (str, optional): Name of the latitude variable in `ds_vert`.
            Defaults to TRACK_LAT_VAR.
        lon_var (str, optional): Name of the longitude variable in `ds_vert`.
            Defaults to TRACK_LON_VAR.
        time_var (str, optional): Name of the time variable in `ds_vert`.
            Defaults to TIME_VAR.
        height_var (str, optional): Name of the height variable in `ds_vert`.
            Defaults to HEIGHT_VAR.
        along_track_dim (str, optional): Name of the along-track dimension in `ds_vert`.
            Defaults to ALONG_TRACK_DIM.
        height_dim (str, optional): Name of the vertical or height dimension in `ds_vert`.
            Defaults to VERTICAL_DIM.
        xmet_lat_var (str, optional): Name of the latitude variable in `ds_xmet`.
            Defaults to "latitude".
        xmet_lon_var (str, optional): Name of the longitude variable in `ds_xmet`.
            Defaults to "longitude".
        xmet_height_var (str, optional): Name of the height variable in `ds_xmet`.
            Defaults to "geometrical_height".
        xmet_height_dim (str, optional): Name of the vertical dimension in `ds_xmet`.
            Defaults to "height".
        xmet_horizontal_grid_dim (str, optional): Name of the horizontal grid dimension in `ds_xmet`.
            Defaults to "horizontal_grid".

    Returns:
        xr.Dataset: A new dataset containing the selected XMET variables interpolated to the grid of the
            vertical curtain given in `ds_vert`. This new dataset has the same along-track and vertical
            dimensions as `ds_vert`.

    Raises:
        KeyError: If any specified variable or coordinate name is not found in `ds_xmet`.
    """
    if vars is None:
        vars = [str(v) for v in ds_xmet.variables]
    else:
        for var in vars:
            if var not in ds_xmet.variables:
                present_vars = [str(v) for v in ds_xmet.variables]
                raise KeyError(
                    f"""X-MET dataset does not contain variable "{var}". Present variables are: {", ".join(present_vars)}"""
                )

    new_ds_xmet = ds_xmet.copy().swap_dims({xmet_height_dim: "tmp_xmet_height"})
    new_ds_xmet[time_var] = ds_vert[time_var].copy()
    new_ds_xmet[height_var] = ds_vert[height_var].copy()

    hgrid_lat = ds_xmet[xmet_lat_var].values.flatten()
    hgrid_lon = ds_xmet[xmet_lon_var].values.flatten()
    hgrid_alt = ds_xmet[xmet_height_var].values
    hgrid_coords = sequence_geo_to_ecef(hgrid_lat, hgrid_lon)

    track_lat = ds_vert[lat_var].values
    track_lon = ds_vert[lon_var].values
    track_alt = ds_vert[height_var].values
    track_coords = sequence_geo_to_ecef(track_lat, track_lon)

    tree = cKDTree(hgrid_coords)
    dists, idxs = tree.query(track_coords, k=k)

    # Inverse distance weighting
    if k > 1:
        weights = 1.0 / (dists + eps)
        weights /= np.sum(weights, axis=1, keepdims=True)
        height = np.einsum("ij,ijh->ih", weights, hgrid_alt[idxs])
    else:
        weights = np.ones(idxs.shape)
        height = hgrid_alt[idxs]

    dims: str | tuple[str, str]
    for var in vars:
        values = ds_xmet[var].values
        if len(values.shape) == 0:
            continue

        if len(values.shape) == 1:
            dims = along_track_dim

            if k > 1:
                result = np.sum(values[idxs] * weights, axis=1)
                new_values = result
            else:
                new_values = values[idxs]
        else:
            dims = (along_track_dim, height_dim)

            if k > 1:
                result = np.einsum("ij,ijh->ih", weights, values[idxs])
            else:
                result = values[idxs]

            new_values = np.empty(track_alt.shape)
            new_values[:] = np.nan

            for i in np.arange(track_alt.shape[0]):
                _new_values = np.interp(
                    track_alt[i],
                    height[i],
                    result[i],
                )
                # _new_values = interp(track_alt[i])

                # Fill nans
                # _new_values[np.isnan(_new_values) & (track_alt[i] < height[i, 0])] = result[i, 0]
                # _new_values[np.isnan(_new_values) & (track_alt[i] > height[i, -1])] = result[i, -1]

                new_values[i] = _new_values

        new_var = f"{var}"
        new_ds_xmet[new_var] = (dims, new_values)
        new_ds_xmet[new_var].attrs = ds_xmet[var].attrs

    new_ds_xmet = remove_dims(new_ds_xmet, [xmet_horizontal_grid_dim, xmet_height_dim])

    return new_ds_xmet

search_files_by_regex

search_files_by_regex(root_dirpath, regex_pattern)

Recursively searches for files in a directory that match a given regex pattern.

Parameters:

Name Type Description Default
root_dirpath str

The root directory to start the search from.

required
regex_pattern str

A regular expression pattern to match file names against.

required
Return

list[str]: A list of absolute file paths that point to files with matching names.

Raises:

Type Description
FileNotFoundError

If the root directory does not exist.

error

If the given pattern is not a valid regular expression.

Source code in earthcarekit/utils/read/search.py
def search_files_by_regex(root_dirpath: str, regex_pattern: str) -> list[str]:
    """
    Recursively searches for files in a directory that match a given regex pattern.

    Args:
        root_dirpath (str): The root directory to start the search from.
        regex_pattern (str): A regular expression pattern to match file names against.

    Return:
        list[str]: A list of absolute file paths that point to files with matching names.

    Raises:
        FileNotFoundError: If the root directory does not exist.
        re.error: If the given pattern is not a valid regular expression.
    """
    if not os.path.exists(root_dirpath):
        raise FileNotFoundError(
            f"{search_files_by_regex.__name__}() Root directory does not exist: {root_dirpath}"
        )

    filepaths = []
    for dirpath, _, filenames in os.walk(root_dirpath):
        for filename in filenames:
            filepath = os.path.join(dirpath, filename)
            if re.search(regex_pattern, filename):
                filepaths.append(filepath)
    return filepaths

search_product

search_product(
    root_dirpath=None,
    config=None,
    file_type=None,
    agency=None,
    latency=None,
    timestamp=None,
    baseline=None,
    orbit_and_frame=None,
    orbit_number=None,
    frame_id=None,
    filename=None,
    start_time=None,
    end_time=None,
)

Searches for EarthCARE product files matching given metadata filters.

Parameters:

Name Type Description Default
root_dirpath str

Root directory to search. Defaults to directory given in a configuration file.

None
config str | ECKConfig | None

Path to a config.toml file or a ECKConfig instance. Defaults to the default configuration file path.

None
file_type str | Sequence[str]

Product file type(s) to match.

None
agency str | Sequence[str]

Producing agency or agencies (e.g. "ESA" or "JAXA").

None
latency str | Sequence[str]

Data latency level(s).

None
timestamp TimestampLike | Sequence

Timestamp(s) included in the product's time coverage.

None
baseline str | Sequence[str]

Baseline version(s).

None
orbit_and_frame str | Sequence[str]

Orbit and frame identifiers.

None
orbit_number int, str, | Sequence

Orbit number(s).

None
frame_id str | Sequence[str]

Frame identifier(s).

None
filename str | Sequence[str]

Specific filename(s) or regular expression patterns to match.

None
start_time TimestampLike

First timestamp included in the product's time coverage.

None
end_time TimestampLike

Last timestamp included in the product's time coverage.

None

Returns:

Name Type Description
ProductDataFrame ProductDataFrame

Filtered list of matching product files as a pandas.DataFrame-based object.

Raises:

Type Description
FileNotFoundError

If root directory does not exist.

Source code in earthcarekit/utils/read/product/_search.py
def search_product(
    root_dirpath: str | None = None,
    config: str | ECKConfig | None = None,
    file_type: str | Sequence[str] | None = None,
    agency: str | Sequence[str] | None = None,
    latency: str | Sequence[str] | None = None,
    timestamp: TimestampLike | Sequence[TimestampLike] | None = None,
    baseline: str | Sequence[str] | None = None,
    orbit_and_frame: str | Sequence[str] | None = None,
    orbit_number: int | str | Sequence[int | str] | None = None,
    frame_id: str | Sequence[str] | None = None,
    filename: str | Sequence[str] | None = None,
    start_time: TimestampLike | None = None,
    end_time: TimestampLike | None = None,
) -> ProductDataFrame:
    """
    Searches for EarthCARE product files matching given metadata filters.

    Args:
        root_dirpath (str, optional): Root directory to search. Defaults to directory given in a configuration file.
        config (str | ECKConfig | None , optional): Path to a `config.toml` file or a ECKConfig instance. Defaults to the default configuration file path.
        file_type (str | Sequence[str], optional): Product file type(s) to match.
        agency (str | Sequence[str], optional): Producing agency or agencies (e.g. "ESA" or "JAXA").
        latency (str | Sequence[str], optional): Data latency level(s).
        timestamp (TimestampLike | Sequence, optional): Timestamp(s) included in the product's time coverage.
        baseline (str | Sequence[str], optional): Baseline version(s).
        orbit_and_frame (str | Sequence[str], optional): Orbit and frame identifiers.
        orbit_number (int, str, | Sequence, optional): Orbit number(s).
        frame_id (str | Sequence[str], optional): Frame identifier(s).
        filename (str | Sequence[str], optional): Specific filename(s) or regular expression patterns to match.
        start_time (TimestampLike, optional): First timestamp included in the product's time coverage.
        end_time (TimestampLike, optional): Last timestamp included in the product's time coverage.

    Returns:
        ProductDataFrame: Filtered list of matching product files as a `pandas.DataFrame`-based object.

    Raises:
        FileNotFoundError: If root directory does not exist.
    """
    if not isinstance(root_dirpath, str):
        if isinstance(config, ECKConfig):
            root_dirpath = config.path_to_data
        else:
            root_dirpath = read_config(config).path_to_data

    if not os.path.exists(root_dirpath):
        raise FileNotFoundError(f"Given root directory does not exist: {root_dirpath}")

    mission_id = "ECA"

    if isinstance(file_type, str):
        file_type = [file_type]
    if isinstance(file_type, Sequence):
        _baseline: list[str] = []
        _file_type: list[str] = []
        for i, ft in enumerate(file_type):
            if isinstance(ft, str):
                _parts = ft.split(":")
                if len(_parts) == 2:
                    _file_type.append(_parts[0])
                    _baseline.append(_parts[1])
                    continue
            _file_type.append(ft)
            if isinstance(baseline, str):
                _baseline.append(baseline)
            elif isinstance(baseline, Sequence):
                try:
                    _baseline.append(baseline[i])
                except IndexError as e:
                    raise IndexError(e, f"given baseline list is too small")
            else:
                _baseline.append("latest")
        file_type = _file_type
        baseline = _baseline
    file_type = _to_file_info_list(file_type, FileType)
    baseline = _format_input(
        baseline,
        file_types=file_type,
        default_input="..",
        format_func=validate_baseline,
    )
    baseline_and_file_type_list = [f"{bl}_{ft}" for bl, ft in zip(baseline, file_type)]
    baseline_and_file_type = _list_to_regex(
        baseline_and_file_type_list, ".._..._..._.."
    )

    agency = _to_file_info_list(agency, FileAgency)
    agency = _list_to_regex(agency, ".")

    latency = _to_file_info_list(latency, FileLatency)
    latency = _list_to_regex(latency, ".")

    timestamp = _format_input(timestamp, format_func=to_timestamp)
    _start_time = [] if start_time is None else [to_timestamp(start_time)]
    _end_time = [] if end_time is None else [to_timestamp(end_time)]
    timestamp = timestamp + _start_time + _end_time

    orbit_and_frame = _format_input(orbit_and_frame, format_func=format_orbit_and_frame)
    orbit_and_frame = _list_to_regex(orbit_and_frame, "." * 6)

    orbit_number = _format_input(orbit_number, format_func=format_orbit_number)
    orbit_number = _list_to_regex(orbit_number, "." * 5)

    frame_id = _format_input(frame_id, format_func=format_frame_id)
    frame_id = _list_to_regex(frame_id, ".")

    oaf_list = []
    oaf = ""
    if orbit_number != "." * 5:
        oaf_list.append(orbit_number)
    if frame_id != ".":
        oaf_list.append(frame_id)
    if orbit_number != "." * 5 or frame_id != ".":
        oaf = f"{orbit_number}{frame_id}"

    if oaf == "":
        oaf = orbit_and_frame
    elif oaf != "" and orbit_and_frame != "." * 6:
        oaf = f"(({oaf})|{orbit_and_frame})"

    pattern = f".*{mission_id}_{agency}{latency}{baseline_and_file_type}_........T......Z_........T......Z_{oaf}.h5"

    # pattern = search_pattern(
    #     file_type=file_type,
    #     agency=agency,
    #     latency=latency,
    #     timestamp=timestamp,
    #     baseline=baseline,
    #     orbit_and_frame=orbit_and_frame,
    #     orbit_number=orbit_number,
    #     frame_id=frame_id,
    # )

    if pattern == ".*ECA_...._..._..._.._........T......Z_........T......Z_.......h5":
        files = []
    else:
        files = search_files_by_regex(root_dirpath, pattern)

    if isinstance(filename, str) or isinstance(filename, Sequence):
        if isinstance(filename, str):
            filename = [filename]
        _get_pattern = lambda fn: f".*{os.path.basename(fn).replace('.h5', '')}.*.h5"
        filename = [_get_pattern(fn) for fn in filename]
    elif filename is None:
        filename = []
    else:
        raise TypeError(
            f"Given filename has invalid type ({type(filename)}: {filename})"
        )

    for fn in filename:
        new_files = search_files_by_regex(root_dirpath, fn)
        files.extend(new_files)

    # Remove duplicates
    files = list(set(files))

    old_files = files.copy()
    if len(timestamp) > 0:
        files = []
        for t in timestamp:
            new_files = [
                f for f in old_files if _check_product_contains_timestamp(f, t)
            ]
            if len(new_files) > 0:
                files.extend(new_files)

    pdf = get_product_infos(files)

    if start_time is not None or end_time is not None:
        _pdf = get_product_infos(old_files)
        _pdf = filter_time_range(_pdf, start_time=start_time, end_time=end_time)

        if not pdf.empty and not _pdf.empty:
            pdf = ProductDataFrame(pd.concat([pdf, _pdf], ignore_index=True))
        elif not _pdf.empty:
            pdf = _pdf

    pdf = pdf.sort_values(by=["orbit_and_frame", "file_type", "start_processing_time"])
    pdf = pdf.drop_duplicates()
    pdf = pdf.reset_index(drop=True)

    pdf.validate_columns()
    return pdf