Skip to content

API reference

earthcarekit.read

earthcarekit.read

Reading utilities for EarthCARE product data.


FileAgency

Bases: FileInfoEnum

Source code in earthcarekit/utils/read/product/file_info/agency.py
class FileAgency(FileInfoEnum):
    ESA = "E"
    JAXA = "J"

    @classmethod
    def from_input(cls, input: str | xr.Dataset) -> "FileAgency":
        """Infers the EarthCARE product agency (i.e. ESA or JAXA) from a given file or dataset."""
        if isinstance(input, str):
            try:
                return cls[input.upper()]
            except AttributeError:
                pass
            except KeyError:
                pass
            try:
                return cls(input.upper())
            except ValueError:
                pass

        return get_file_agency(input)

from_input classmethod

from_input(input)

Infers the EarthCARE product agency (i.e. ESA or JAXA) from a given file or dataset.

Source code in earthcarekit/utils/read/product/file_info/agency.py
@classmethod
def from_input(cls, input: str | xr.Dataset) -> "FileAgency":
    """Infers the EarthCARE product agency (i.e. ESA or JAXA) from a given file or dataset."""
    if isinstance(input, str):
        try:
            return cls[input.upper()]
        except AttributeError:
            pass
        except KeyError:
            pass
        try:
            return cls(input.upper())
        except ValueError:
            pass

    return get_file_agency(input)

FileLatency

Bases: FileInfoEnum

Source code in earthcarekit/utils/read/product/file_info/latency.py
class FileLatency(FileInfoEnum):
    NEAR_REAL_TIME = "N"
    OFFLINE = "O"
    NOT_APPLICABLE = "X"

    @classmethod
    def from_input(cls, input: str | xr.Dataset) -> "FileLatency":
        """Infers the EarthCARE product latency indicator (i.e. N for Near-real time, O for Offline, X for not applicable) from a given name, file or dataset."""
        if isinstance(input, str):
            try:
                return cls[input.upper()]
            except AttributeError:
                pass
            except KeyError:
                pass
            try:
                return cls(input.upper())
            except ValueError:
                pass

        return get_file_latency(input)

from_input classmethod

from_input(input)

Infers the EarthCARE product latency indicator (i.e. N for Near-real time, O for Offline, X for not applicable) from a given name, file or dataset.

Source code in earthcarekit/utils/read/product/file_info/latency.py
@classmethod
def from_input(cls, input: str | xr.Dataset) -> "FileLatency":
    """Infers the EarthCARE product latency indicator (i.e. N for Near-real time, O for Offline, X for not applicable) from a given name, file or dataset."""
    if isinstance(input, str):
        try:
            return cls[input.upper()]
        except AttributeError:
            pass
        except KeyError:
            pass
        try:
            return cls(input.upper())
        except ValueError:
            pass

    return get_file_latency(input)

FileType

Bases: FileInfoEnum

Source code in earthcarekit/utils/read/product/file_info/type.py
class FileType(FileInfoEnum):
    # Level 1
    ATL_NOM_1B = "ATL_NOM_1B"
    ATL_DCC_1B = "ATL_DCC_1B"
    ATL_CSC_1B = "ATL_CSC_1B"
    ATL_FSC_1B = "ATL_FSC_1B"
    MSI_NOM_1B = "MSI_NOM_1B"
    MSI_BBS_1B = "MSI_BBS_1B"
    MSI_SD1_1B = "MSI_SD1_1B"
    MSI_SD2_1B = "MSI_SD2_1B"
    MSI_RGR_1C = "MSI_RGR_1C"
    BBR_NOM_1B = "BBR_NOM_1B"
    BBR_SNG_1B = "BBR_SNG_1B"
    BBR_SOL_1B = "BBR_SOL_1B"
    BBR_LIN_1B = "BBR_LIN_1B"
    CPR_NOM_1B = "CPR_NOM_1B"  # JAXA product
    # Level 2a
    ATL_FM__2A = "ATL_FM__2A"
    ATL_AER_2A = "ATL_AER_2A"
    ATL_ICE_2A = "ATL_ICE_2A"
    ATL_TC__2A = "ATL_TC__2A"
    ATL_EBD_2A = "ATL_EBD_2A"
    ATL_CTH_2A = "ATL_CTH_2A"
    ATL_ALD_2A = "ATL_ALD_2A"
    MSI_CM__2A = "MSI_CM__2A"
    MSI_COP_2A = "MSI_COP_2A"
    MSI_AOT_2A = "MSI_AOT_2A"
    CPR_FMR_2A = "CPR_FMR_2A"
    CPR_CD__2A = "CPR_CD__2A"
    CPR_TC__2A = "CPR_TC__2A"
    CPR_CLD_2A = "CPR_CLD_2A"
    CPR_APC_2A = "CPR_APC_2A"
    ATL_CLA_2A = "ATL_CLA_2A"  # JAXA product
    MSI_CLP_2A = "MSI_CLP_2A"  # JAXA product
    CPR_ECO_2A = "CPR_ECO_2A"  # JAXA product
    CPR_CLP_2A = "CPR_CLP_2A"  # JAXA product
    # Level 2b
    AM__MO__2B = "AM__MO__2B"
    AM__CTH_2B = "AM__CTH_2B"
    AM__ACD_2B = "AM__ACD_2B"
    AC__TC__2B = "AC__TC__2B"
    BM__RAD_2B = "BM__RAD_2B"
    BMA_FLX_2B = "BMA_FLX_2B"
    ACM_CAP_2B = "ACM_CAP_2B"
    ACM_COM_2B = "ACM_COM_2B"
    ACM_RT__2B = "ACM_RT__2B"
    ALL_DF__2B = "ALL_DF__2B"
    ALL_3D__2B = "ALL_3D__2B"
    AC__CLP_2B = "AC__CLP_2B"  # JAXA product
    ACM_CLP_2B = "ACM_CLP_2B"  # JAXA product
    ALL_RAD_2B = "ALL_RAD_2B"  # JAXA product
    # Auxiliary data
    AUX_MET_1D = "AUX_MET_1D"
    AUX_JSG_1D = "AUX_JSG_1D"
    # Orbit data
    MPL_ORBSCT = "MPL_ORBSCT"
    AUX_ORBPRE = "AUX_ORBPRE"
    AUX_ORBRES = "AUX_ORBRES"

    @classmethod
    def from_input(cls, input: str | xr.Dataset) -> "FileType":
        """Infers the EarthCARE product type from a given file or dataset."""
        if isinstance(input, str):
            try:
                return cls[format_file_type_string(input)]
            except AttributeError:
                pass
            except KeyError:
                pass
            try:
                return cls(format_file_type_string(input))
            except ValueError:
                pass
            except KeyError:
                pass

        return get_file_type(input)

    @classmethod
    def list(cls):
        return list(map(lambda c: c.value, cls))

    def to_shorthand(self, with_dash: bool = False):
        if with_dash:
            return _short_hand_map[self.value]
        else:
            return _short_hand_map[self.value].replace("-", "")

    def get_level(self) -> Literal["1B", "1C", "2A", "2B", "1D", "ORB"]:
        if self.value[-2:] in ["1B", "1C", "1D", "2A", "2B"]:
            return self.value[-2:]  # type: ignore
        elif self.value in [
            FileType.MPL_ORBSCT.value,
            FileType.AUX_ORBPRE.value,
            FileType.AUX_ORBRES.value,
        ]:
            return "ORB"
        raise NotImplementedError(f"missing implementation for {self}")

from_input classmethod

from_input(input)

Infers the EarthCARE product type from a given file or dataset.

Source code in earthcarekit/utils/read/product/file_info/type.py
@classmethod
def from_input(cls, input: str | xr.Dataset) -> "FileType":
    """Infers the EarthCARE product type from a given file or dataset."""
    if isinstance(input, str):
        try:
            return cls[format_file_type_string(input)]
        except AttributeError:
            pass
        except KeyError:
            pass
        try:
            return cls(format_file_type_string(input))
        except ValueError:
            pass
        except KeyError:
            pass

    return get_file_type(input)

ProductInfo dataclass

Class storing all info gathered from a EarthCARE product's file path.

Attributes:

Name Type Description
mission_id FileMissionID

Mission ID (ECA = EarthCARE).

agency FileAgency

Agency that generated the file (E = ESA, J = JAXA).

latency FileLatency

Latency indicator (X = not applicable, N = near real-time, O = offline).

baseline str

Two-letter product/processor version string (e.g., "BA").

file_type FileType

Full product name (10 characters, e.g., "ATL_EBD_2A").

start_sensing_time Timestamp

Start-time of data collection (i.e., time of first available data in the product).

start_processing_time Timestamp

Start-time of processing (i.e., time at which creation of the product started).

orbit_number int

Number of the orbit.

frame_id str

Single letter identifier between A and H, indication the orbit segment (A,B,H = night frames; D,E,F = day frames; C,G = polar day/night frames).

orbit_and_frame str

Six-character string with leading zeros combining orbit number and frame ID.

name str

Full name of the product without file extension.

filepath str

Local file path or empty string if not available.

hdr_filepath str

Local header file path or empty string if not available.

Source code in earthcarekit/utils/read/product/file_info/product_info.py
@dataclass
class ProductInfo:
    """
    Class storing all info gathered from a EarthCARE product's file path.

    Attributes:
        mission_id (FileMissionID):
            Mission ID (ECA = EarthCARE).
        agency (FileAgency):
            Agency that generated the file (E = ESA, J = JAXA).
        latency (FileLatency):
            Latency indicator (X = not applicable, N = near real-time, O = offline).
        baseline (str):
            Two-letter product/processor version string (e.g., "BA").
        file_type (FileType):
            Full product name (10 characters, e.g., "ATL_EBD_2A").
        start_sensing_time (pd.Timestamp):
            Start-time of data collection (i.e., time of first available data in the product).
        start_processing_time (pd.Timestamp):
            Start-time of processing (i.e., time at which creation of the product started).
        orbit_number (int):
            Number of the orbit.
        frame_id (str):
            Single letter identifier between A and H, indication the orbit segment
            (A,B,H = night frames; D,E,F = day frames; C,G = polar day/night frames).
        orbit_and_frame (str):
            Six-character string with leading zeros combining orbit number and frame ID.
        name (str):
            Full name of the product without file extension.
        filepath (str):
            Local file path or empty string if not available.
        hdr_filepath (str):
            Local header file path or empty string if not available.
    """

    mission_id: FileMissionID
    agency: FileAgency
    latency: FileLatency
    baseline: str
    file_type: FileType
    start_sensing_time: pd.Timestamp
    start_processing_time: pd.Timestamp
    orbit_number: int
    frame_id: str
    orbit_and_frame: str
    name: str
    filepath: str
    hdr_filepath: str

    def to_dict(self) -> dict:
        """Returns product info as a Python `dict`."""
        return asdict(self)

    def to_dataframe(self) -> "ProductDataFrame":
        """Returns product info as a `pandas.Dataframe`."""
        return ProductDataFrame([self])

to_dataframe

to_dataframe()

Returns product info as a pandas.Dataframe.

Source code in earthcarekit/utils/read/product/file_info/product_info.py
def to_dataframe(self) -> "ProductDataFrame":
    """Returns product info as a `pandas.Dataframe`."""
    return ProductDataFrame([self])

to_dict

to_dict()

Returns product info as a Python dict.

Source code in earthcarekit/utils/read/product/file_info/product_info.py
def to_dict(self) -> dict:
    """Returns product info as a Python `dict`."""
    return asdict(self)

add_depol_ratio

add_depol_ratio(
    ds_anom,
    rolling_w=20,
    near_zero_tolerance=2e-07,
    smooth=True,
    skip_height_above_elevation=300,
    cpol_var="mie_attenuated_backscatter",
    xpol_var="crosspolar_attenuated_backscatter",
    elevation_var=ELEVATION_VAR,
    height_var=HEIGHT_VAR,
    height_dim=VERTICAL_DIM,
)

Compute depolarization ratio (DPOL = XPOL/CPOL) from attenuated backscatter signals.

This function derives the depol. ratio from cross-polarized (XPOL) and co-polarized (CPOL) attenuated backscatter signals. Signals below the surface are masked, by default with a vertical margin on 300 meters above elevation to remove potential surface return. Also, signals are smoothed (or "cleaned") with a rolling mean, and near-zero divisions are suppressed and set to NaN instead. In the resulting dataset, the ratio curtain and a ratio profile calculated from mean profiles of the full dataset (e.g., mean(XPOL)/mean(CPOL)).

Parameters:

Name Type Description Default
ds_anom Dataset

ATL_NOM_1B dataset containing cross- and co-polar attenuated backscatter.

required
rolling_w int

Window size for rolling mean smoothing. Defaults to 20.

20
near_zero_tolerance float

Tolerance for masking near-zero CPOL (i.e., denominators). Defaults to 2e-7.

2e-07
smooth bool

Whether to apply rolling mean smoothing. Defaults to True.

True
skip_height_above_elevation int

Vertical margin above surface elevation to mask in meters. Defaults to 300.

300
cpol_var str

Input co-polar variable name. Defaults to "mie_attenuated_backscatter".

'mie_attenuated_backscatter'
xpol_var str

Input cross-polar variable name. Defaults to "crosspolar_attenuated_backscatter".

'crosspolar_attenuated_backscatter'
elevation_var str

Elevation variable name. Defaults to ELEVATION_VAR.

ELEVATION_VAR
height_var str

Height variable name. Defaults to HEIGHT_VAR.

HEIGHT_VAR
height_dim str

Height dimension name. Defaults to VERTICAL_DIM.

VERTICAL_DIM

Returns:

Type Description
Dataset

xr.Dataset: Dataset with added depol. ratio, cleaned signals, and depol. ratio profile from mean profiles.

Source code in earthcarekit/utils/read/product/level1/atl_nom_1b.py
def add_depol_ratio(
    ds_anom: xr.Dataset,
    rolling_w: int = 20,
    near_zero_tolerance: float = 2e-7,
    smooth: bool = True,
    skip_height_above_elevation: int = 300,
    cpol_var: str = "mie_attenuated_backscatter",
    xpol_var: str = "crosspolar_attenuated_backscatter",
    elevation_var: str = ELEVATION_VAR,
    height_var: str = HEIGHT_VAR,
    height_dim: str = VERTICAL_DIM,
) -> xr.Dataset:
    """
    Compute depolarization ratio (`DPOL` = `XPOL`/`CPOL`) from attenuated backscatter signals.

    This function derives the depol. ratio from cross-polarized (`XPOL`) and co-polarized (`CPOL`) attenuated backscatter signals.
    Signals below the surface are masked, by default with a vertical margin on 300 meters above elevation to remove potential surface return.
    Also, signals are smoothed (or "cleaned") with a rolling mean, and near-zero divisions are suppressed and set to NaN instead.
    In the resulting dataset, the ratio curtain and a ratio profile calculated from mean profiles of the full dataset (e.g., mean(`XPOL`)/mean(`CPOL`)).

    Args:
        ds_anom (xr.Dataset): ATL_NOM_1B dataset containing cross- and co-polar attenuated backscatter.
        rolling_w (int, optional): Window size for rolling mean smoothing. Defaults to 20.
        near_zero_tolerance (float, optional): Tolerance for masking near-zero `CPOL` (i.e., denominators). Defaults to 2e-7.
        smooth (bool, optional): Whether to apply rolling mean smoothing. Defaults to True.
        skip_height_above_elevation (int, optional): Vertical margin above surface elevation to mask in meters. Defaults to 300.
        cpol_var (str, optional): Input co-polar variable name. Defaults to "mie_attenuated_backscatter".
        xpol_var (str, optional): Input cross-polar variable name. Defaults to "crosspolar_attenuated_backscatter".
        elevation_var (str, optional): Elevation variable name. Defaults to ELEVATION_VAR.
        height_var (str, optional): Height variable name. Defaults to HEIGHT_VAR.
        height_dim (str, optional): Height dimension name. Defaults to VERTICAL_DIM.

    Returns:
        xr.Dataset: Dataset with added depol. ratio, cleaned signals, and depol. ratio profile from mean profiles.
    """
    return add_scattering_ratio(
        ds_anom=ds_anom,
        formula="x/c",
        rolling_w=rolling_w,
        near_zero_tolerance=near_zero_tolerance,
        smooth=smooth,
        skip_height_above_elevation=skip_height_above_elevation,
        cpol_var=cpol_var,
        xpol_var=xpol_var,
        elevation_var=elevation_var,
        height_var=height_var,
        height_dim=height_dim,
    )

add_isccp_cloud_type

add_isccp_cloud_type(
    ds,
    new_var="isccp_cloud_type",
    cot_var="cloud_optical_thickness",
    cth_var="cloud_top_height",
    along_track_dim=ALONG_TRACK_DIM,
    across_track_dim=ACROSS_TRACK_DIM,
)

Adds a variable to the dataset containing ISCCP cloud types calculated from cloud optical thickness (COT) and cloud top height (CTH).

Parameters:

Name Type Description Default
ds Dataset

A MSI_COP_2A dataset.

required
new_var str

Name of the new ISCCP cloud type variable. Defaults to "isccp_cloud_type".

'isccp_cloud_type'
cot_var str

Name of the COT variable in ds. Defaults to "cloud_optical_thickness".

'cloud_optical_thickness'
cth_var str

Name of the CTH variable in ds. Defaults to "cloud_top_height".

'cloud_top_height'
along_track_dim str

Name of the along-track dimension in ds. Defaults to ALONG_TRACK_DIM.

ALONG_TRACK_DIM
across_track_dim str

Name of the across-track dimension in ds. Defaults to ACROSS_TRACK_DIM.

ACROSS_TRACK_DIM

Returns:

Type Description
Dataset

xr.Dataset: The input dataset with added ISCCP cloud type variable.

References
  • International Satellite Cloud Climatology Project (ISCCP). ISCCP Definition of Cloud Types. Retrieved September 25, 2025. https://isccp.giss.nasa.gov/cloudtypes.html
Source code in earthcarekit/utils/read/product/level2a/msi_cop_2a.py
def add_isccp_cloud_type(
    ds: xr.Dataset,
    new_var: str = "isccp_cloud_type",
    cot_var: str = "cloud_optical_thickness",
    cth_var: str = "cloud_top_height",
    along_track_dim: str = ALONG_TRACK_DIM,
    across_track_dim: str = ACROSS_TRACK_DIM,
) -> xr.Dataset:
    """
    Adds a variable to the dataset containing ISCCP cloud types calculated from cloud optical thickness (COT)
    and cloud top height (CTH).

    Args:
        ds (xr.Dataset): A MSI_COP_2A dataset.
        new_var (str, optional): Name of the new ISCCP cloud type variable. Defaults to "isccp_cloud_type".
        cot_var (str, optional): Name of the COT variable in `ds`. Defaults to "cloud_optical_thickness".
        cth_var (str, optional): Name of the CTH variable in `ds`. Defaults to "cloud_top_height".
        along_track_dim (str, optional): Name of the along-track dimension in `ds`. Defaults to ALONG_TRACK_DIM.
        across_track_dim (str, optional): Name of the across-track dimension in `ds`. Defaults to ACROSS_TRACK_DIM.

    Returns:
        xr.Dataset: The input dataset with added ISCCP cloud type variable.

    References:
        - International Satellite Cloud Climatology Project (ISCCP). ISCCP Definition of Cloud Types.
        Retrieved September 25, 2025. https://isccp.giss.nasa.gov/cloudtypes.html
    """
    cot = ds[cot_var].values
    cth = ds[cth_var].values

    cu = np.where((cth >= 100) & (cth < 3200) & (cot >= 0.01) & (cot < 3.6))
    ac = np.where((cth >= 3200) & (cth < 6500) & (cot >= 0.01) & (cot < 3.6))
    ci = np.where((cth >= 6500) & (cth < 19300) & (cot >= 0.01) & (cot < 3.6))
    sc = np.where((cth >= 100) & (cth < 3200) & (cot >= 3.6) & (cot < 23))
    asc = np.where((cth >= 3200) & (cth < 6500) & (cot >= 3.6) & (cot < 23))
    cs = np.where((cth >= 6500) & (cth < 19300) & (cot >= 3.6) & (cot < 23))
    st = np.where((cth >= 100) & (cth < 3200) & (cot >= 23))
    ns = np.where((cth >= 3200) & (cth < 6500) & (cot >= 23))
    cb = np.where((cth >= 6500) & (cth < 19300) & (cot >= 23))
    clear = np.where((cot < 0.01) & (cot >= 0))

    cloud_type = np.empty(shape=cot.shape, dtype=int)
    cloud_type[:, :] = -127

    cloud_type[cu] = 1
    cloud_type[ac] = 2
    cloud_type[ci] = 3
    cloud_type[sc] = 4
    cloud_type[asc] = 5
    cloud_type[cs] = 6
    cloud_type[st] = 7
    cloud_type[ns] = 8
    cloud_type[cb] = 9
    cloud_type[clear] = 0

    da = xr.DataArray(
        cloud_type,
        dims=(along_track_dim, across_track_dim),
        name=new_var,
        attrs={
            "units": "",
            "long_name": "ISCCP cloud type calculated from M-COP",
            "definition": "0: Clear, 1: Cumulus, 2: Altocumulus, 3: Cirrus, 4: Stratocumulus, 5: Altostratus, 6: Cirrostratus, 7: Stratus, 8: Nimbostratus, 9: Deep convection, -127: Not determined",
            "earthcarekit": "Added by earthcarekit",
        },
    )
    ds[new_var] = da

    return ds

add_potential_temperature

add_potential_temperature(
    ds,
    temperature_var="temperature_kelvin",
    pressure_var="pressure",
    new_var="potential_temperature",
)

Computes potential temperature from temperature [K] and pressure [Pa] and adds it as a variable to the dataset (source: https://en.wikipedia.org/wiki/Potential_temperature, accessed: 2026-02-06).

Parameters:

Name Type Description Default
ds Dataset

Dataset (e.g., AUX_MET_1D) containing temperature [K] and pressure [Pa] data.

required
temperature_var str

Input temperature variable name. Defaults to "temperature_kelvin".

'temperature_kelvin'
pressure_var str

Input pressure variable name. Defaults to "pressure".

'pressure'
new_var str

New variable name for potential temperature. Defaults to "potential_temperature".

'potential_temperature'

Returns:

Type Description
Dataset

xr.Dataset: Dataset with 2 new variables for potential temperature profiles added (kelvin and celsius).

Source code in earthcarekit/utils/read/product/auxiliary/aux_met_1d.py
def add_potential_temperature(
    ds: xr.Dataset,
    temperature_var: str = "temperature_kelvin",
    pressure_var: str = "pressure",
    new_var: str = "potential_temperature",
) -> xr.Dataset:
    """
    Computes potential temperature from temperature [K] and pressure [Pa] and adds it as a variable to the dataset (source: https://en.wikipedia.org/wiki/Potential_temperature, accessed: 2026-02-06).

    Args:
        ds (xr.Dataset): Dataset (e.g., AUX_MET_1D) containing temperature [K] and pressure [Pa] data.
        temperature_var (str, optional): Input temperature variable name. Defaults to "temperature_kelvin".
        pressure_var (str, optional): Input pressure variable name. Defaults to "pressure".
        new_var (str, optional): New variable name for potential temperature. Defaults to "potential_temperature".

    Returns:
        xr.Dataset: Dataset with 2 new variables for potential temperature profiles added (kelvin and celsius).
    """
    t = ds[temperature_var].values  # [K]
    p = ds[pressure_var].values  # [Pa]
    p0 = 100_000.0  # [Pa]
    rcp = 0.286
    potential_t = t * np.pow(p0 / p, rcp)

    attrs = {
        "units": "K",
        "long_name": "Potential temperature",
        "name": "Potential temperature",
    }
    ds[f"{new_var}_kelvin"] = (
        ds[temperature_var].copy().drop_attrs().assign_attrs(attrs)
    )
    ds[f"{new_var}_kelvin"].values = potential_t
    attrs["units"] = r"$^{\circ}$C"
    ds[f"{new_var}_celsius"] = (
        ds[temperature_var].copy().drop_attrs().assign_attrs(attrs)
    )
    ds[f"{new_var}_celsius"].values = potential_t - 273.15

    return ds

add_scattering_ratio

add_scattering_ratio(
    ds_anom,
    formula,
    rolling_w=20,
    near_zero_tolerance=2e-07,
    smooth=True,
    skip_height_above_elevation=300,
    cpol_var="mie_attenuated_backscatter",
    xpol_var="crosspolar_attenuated_backscatter",
    ray_var="rayleigh_attenuated_backscatter",
    elevation_var=ELEVATION_VAR,
    height_var=HEIGHT_VAR,
    height_dim=VERTICAL_DIM,
)

Compute scattering ratio from attenuated backscatter signals given a formula: "x/c", "(c+x)/r", or "(c+x+r)/r".

This function derives the scattering ratio from cross-polarized (XPOL), co-polarized (CPOL) and rayleigh (RAY) attenuated backscatter signals. Signals below the surface are masked, by default with a vertical margin on 300 meters above elevation to remove potential surface return. Also, signals are smoothed (or "cleaned") with a rolling mean, and near-zero divisions are suppressed and set to NaN instead. In the resulting dataset, the ratio curtain and a ratio profile calculated from mean profiles of the full dataset (e.g., mean(XPOL)/mean(CPOL)).

Parameters:

Name Type Description Default
ds_anom Dataset

ATL_NOM_1B dataset containing the attenuated backscatter signals.

required
formula Literal['x/c', '(c+x)/r', '(c+x+r)/r']

Formula used to calculate the scattering ratio.

required
rolling_w int

Window size for rolling mean smoothing. Defaults to 20.

20
near_zero_tolerance float

Tolerance for masking near-zero denominators. Defaults to 2e-7.

2e-07
smooth bool

Whether to apply rolling mean smoothing. Defaults to True.

True
skip_height_above_elevation int

Vertical margin above surface elevation to mask in meters. Defaults to 300.

300
cpol_var str

Input co-polar variable name. Defaults to "mie_attenuated_backscatter".

'mie_attenuated_backscatter'
xpol_var str

Input cross-polar variable name. Defaults to "crosspolar_attenuated_backscatter".

'crosspolar_attenuated_backscatter'
ray_var str

Input rayleigh variable name. Defaults to "rayleigh_attenuated_backscatter".

'rayleigh_attenuated_backscatter'
elevation_var str

Elevation variable name. Defaults to ELEVATION_VAR.

ELEVATION_VAR
height_var str

Height variable name. Defaults to HEIGHT_VAR.

HEIGHT_VAR
height_dim str

Height dimension name. Defaults to VERTICAL_DIM.

VERTICAL_DIM

Returns:

Type Description
Dataset

xr.Dataset: xr.Dataset: Dataset with added ratio curtain and ratio profile from mean profiles.

Source code in earthcarekit/utils/read/product/level1/atl_nom_1b.py
def add_scattering_ratio(
    ds_anom: xr.Dataset,
    formula: Literal["x/c", "(c+x)/r", "(c+x+r)/r"],
    rolling_w: int = 20,
    near_zero_tolerance: float = 2e-7,
    smooth: bool = True,
    skip_height_above_elevation: int = 300,
    cpol_var: str = "mie_attenuated_backscatter",
    xpol_var: str = "crosspolar_attenuated_backscatter",
    ray_var: str = "rayleigh_attenuated_backscatter",
    elevation_var: str = ELEVATION_VAR,
    height_var: str = HEIGHT_VAR,
    height_dim: str = VERTICAL_DIM,
) -> xr.Dataset:
    """
    Compute scattering ratio from attenuated backscatter signals given a formula: "x/c", "(c+x)/r", or "(c+x+r)/r".

    This function derives the scattering ratio from cross-polarized (`XPOL`), co-polarized (`CPOL`) and rayleigh (`RAY`) attenuated backscatter signals.
    Signals below the surface are masked, by default with a vertical margin on 300 meters above elevation to remove potential surface return.
    Also, signals are smoothed (or "cleaned") with a rolling mean, and near-zero divisions are suppressed and set to NaN instead.
    In the resulting dataset, the ratio curtain and a ratio profile calculated from mean profiles of the full dataset (e.g., mean(`XPOL`)/mean(`CPOL`)).

    Args:
        ds_anom (xr.Dataset): ATL_NOM_1B dataset containing the attenuated backscatter signals.
        formula (Literal["x/c", "(c+x)/r", "(c+x+r)/r"]): Formula used to calculate the scattering ratio.
        rolling_w (int, optional): Window size for rolling mean smoothing. Defaults to 20.
        near_zero_tolerance (float, optional): Tolerance for masking near-zero denominators. Defaults to 2e-7.
        smooth (bool, optional): Whether to apply rolling mean smoothing. Defaults to True.
        skip_height_above_elevation (int, optional): Vertical margin above surface elevation to mask in meters. Defaults to 300.
        cpol_var (str, optional): Input co-polar variable name. Defaults to "mie_attenuated_backscatter".
        xpol_var (str, optional): Input cross-polar variable name. Defaults to "crosspolar_attenuated_backscatter".
        ray_var (str, optional): Input rayleigh variable name. Defaults to "rayleigh_attenuated_backscatter".
        elevation_var (str, optional): Elevation variable name. Defaults to ELEVATION_VAR.
        height_var (str, optional): Height variable name. Defaults to HEIGHT_VAR.
        height_dim (str, optional): Height dimension name. Defaults to VERTICAL_DIM.

    Returns:
        xr.Dataset: xr.Dataset: Dataset with added ratio curtain and ratio profile from mean profiles.
    """

    if formula.lower() not in ["x/c", "(c+x)/r", "(c+x+r)/r"]:
        raise ValueError(
            f"invalid formula '{formula}', expected 'x/c', '(c+x)/r' or '(c+x+r)/r'"
        )

    cpol_cleaned_var: str = "cpol_cleaned_for_ratio_calculation"
    xpol_cleaned_var: str = "xpol_cleaned_for_ratio_calculation"
    ray_cleaned_var: str = "ray_cleaned_for_ratio_calculation"

    cpol_da = ds_anom[cpol_var].copy()
    xpol_da = ds_anom[xpol_var].copy()
    ray_da = ds_anom[ray_var].copy()
    # if formula == "x/c":
    #     ray_da = xpol_da
    # else:

    def _calc(c, x, r):
        if formula == "x/c":
            return x / c
        elif formula == "(c+x)/r":
            return (c + x) / r
        elif formula == "(c+x+r)/r":
            return (c + x + r) / r

    def _get_near_zero_mask(c, x, r):
        if formula == "x/c":
            return np.isclose(c, 0, atol=near_zero_tolerance)
        elif formula == "(c+x)/r":
            return np.isclose(r, 0, atol=near_zero_tolerance)
        elif formula == "(c+x+r)/r":
            return np.isclose(r, 0, atol=near_zero_tolerance)

    def _get_long_name():
        if formula == "x/c":
            return "Depol. ratio from cross- and co-polar atten. part. bsc."
        elif formula == "(c+x)/r":
            return "Total part. to ray. atten. bsc. ratio"
        elif formula == "(c+x+r)/r":
            return "Total to ray. atten. bsc. ratio"

    def _get_ratio_var():
        if formula == "x/c":
            return "depol_ratio"
        elif formula == "(c+x)/r":
            return "cpol_xpol_to_ray_ratio"
        elif formula == "(c+x+r)/r":
            return "cpol_xpol_ray_to_ray_ratio"

    ratio_var = _get_ratio_var()
    ratio_from_means_var = f"{ratio_var}_from_means"

    ds_anom[ratio_var] = _calc(cpol_da, xpol_da, ray_da)
    rename_var_info(
        ds_anom,
        ratio_var,
        name=ratio_var,
        long_name=_get_long_name(),
        units="",
    )

    elevation = (
        ds_anom[elevation_var].data.copy()[:, np.newaxis] + skip_height_above_elevation
    )
    mask_surface = ds_anom[height_var].data[0].copy() < elevation

    cpol = ds_anom[cpol_var].data
    xpol = ds_anom[xpol_var].data
    ray = ds_anom[ray_var].data
    # if formula == "x/c":
    #     ray = xpol
    # else:

    cpol[mask_surface] = np.nan
    xpol[mask_surface] = np.nan
    ray[mask_surface] = np.nan

    if smooth:
        cpol = rolling_mean_2d(cpol, rolling_w, axis=0)
        xpol = rolling_mean_2d(xpol, rolling_w, axis=0)
        ray = rolling_mean_2d(ray, rolling_w, axis=0)

    ds_anom[ratio_var].data = _calc(cpol, xpol, ray)
    ds_anom[ratio_var] = ds_anom[ratio_var].assign_attrs(
        {
            "earthcarekit": "Added by earthcarekit: Intended for use in curtain plots only!",
        }
    )

    if smooth:
        near_zero_mask = _get_near_zero_mask(cpol, xpol, ray)
        ds_anom[ratio_var].data[near_zero_mask] = np.nan
        cpol[near_zero_mask] = np.nan
        xpol[near_zero_mask] = np.nan
        ray[near_zero_mask] = np.nan

    ds_anom[xpol_cleaned_var] = ds_anom[xpol_var].copy()
    ds_anom[xpol_cleaned_var].data = xpol
    ds_anom[xpol_cleaned_var] = ds_anom[xpol_cleaned_var].assign_attrs(
        {
            "earthcarekit": f"Added by earthcarekit: Rolling mean applied (w={rolling_w}) and near-zero values removed (tolerance={near_zero_tolerance})"
        }
    )

    ds_anom[cpol_cleaned_var] = ds_anom[cpol_var].copy()
    ds_anom[cpol_cleaned_var].data = cpol
    ds_anom[cpol_cleaned_var] = ds_anom[cpol_cleaned_var].assign_attrs(
        {
            "earthcarekit": f"Added by earthcarekit: Rolling mean applied (w={rolling_w}) and near-zero values removed (tolerance={near_zero_tolerance})"
        }
    )

    # if formula == "x/c":
    ds_anom[ray_cleaned_var] = ds_anom[ray_var].copy()
    ds_anom[ray_cleaned_var].data = ray
    ds_anom[ray_cleaned_var] = ds_anom[ray_cleaned_var].assign_attrs(
        {
            "earthcarekit": f"Added by earthcarekit: Rolling mean applied (w={rolling_w}) and near-zero values removed (tolerance={near_zero_tolerance})"
        }
    )

    ratio_mean = _calc(
        nan_mean(cpol, axis=0),
        nan_mean(xpol, axis=0),
        nan_mean(ray, axis=0),
    )

    ds_anom[ratio_from_means_var] = xr.DataArray(
        data=ratio_mean,
        dims=[height_dim],
        attrs={
            "long_name": _get_long_name(),
            "units": "",
            "earthcarekit": "Added by earthcarekit: Scattering ratio profile calculated from the mean profiles",
        },
    )

    return ds_anom

get_product_info

get_product_info(filepath, warn=False, must_exist=True)

Gather all info contained in the EarthCARE product's file path.

Source code in earthcarekit/utils/read/product/file_info/product_info.py
def get_product_info(
    filepath: str,
    warn: bool = False,
    must_exist: bool = True,
) -> ProductInfo:
    """Gather all info contained in the EarthCARE product's file path."""
    if _is_url(filepath):
        filepath = _get_path_from_url(filepath)
        must_exist = False

    filepath = os.path.abspath(filepath)

    if must_exist and not os.path.exists(filepath):
        raise FileNotFoundError(f"File does not exist: {filepath}")

    if must_exist:
        pattern = re.compile(
            r".*ECA_[EJ][XNO][A-Z]{2}_..._..._.._\d{8}T\d{6}Z_\d{8}T\d{6}Z_\d{5}[ABCDEFGH]\.h5"
        )
    else:
        pattern = re.compile(
            r".*ECA_[EJ][XNO][A-Z]{2}_..._..._.._\d{8}T\d{6}Z_\d{8}T\d{6}Z_\d{5}[ABCDEFGH].*"
        )
    is_match = bool(pattern.fullmatch(filepath))

    if not is_match:
        pattern_orbit_file = re.compile(
            r".*ECA_[EJ][XNO][A-Z]{2}_..._......_\d{8}T\d{6}Z_\d{8}T\d{6}Z_\d{4}.*"
        )
        is_match = bool(pattern_orbit_file.fullmatch(filepath))

        if not is_match:
            raise ValueError(f"EarthCARE product has invalid file name: {filepath}")

        filename = os.path.basename(filepath).removesuffix(".h5")
        mission_id = FileMissionID.from_input(filename[0:3])
        agency = FileAgency.from_input(filename[4])
        latency = FileLatency.from_input(filename[5])
        baseline = filename[6:8]
        file_type = FileType.from_input(filename[9:19])
        start_sensing_time: pd.Timestamp
        try:
            start_sensing_time = pd.Timestamp(filename[20:35])
        except ValueError as e:
            start_sensing_time = pd.NaT  # type: ignore
        start_processing_time: pd.Timestamp
        try:
            start_processing_time = pd.Timestamp(filename[37:52])
        except ValueError as e:
            start_processing_time = pd.NaT  # type: ignore

        info = ProductInfo(
            mission_id=mission_id,
            agency=agency,
            latency=latency,
            baseline=baseline,
            file_type=file_type,
            start_sensing_time=start_sensing_time,
            start_processing_time=start_processing_time,
            orbit_number=0,
            frame_id="",
            orbit_and_frame="",
            name=filename,
            filepath=filepath,
            hdr_filepath="",
        )

        return info

    product_filepath = filepath.removesuffix(".h5").removesuffix(".HDR") + ".h5"
    if not os.path.exists(product_filepath):
        if warn:
            msg = f"Missing product file: {product_filepath}"
            warnings.warn(msg)
        product_filepath = ""

    hdr_filepath = filepath.removesuffix(".h5").removesuffix(".HDR") + ".HDR"
    if not os.path.exists(hdr_filepath):
        if warn:
            msg = f"Missing product header file: {hdr_filepath}"
            warnings.warn(msg)
        hdr_filepath = ""

    filename = os.path.basename(filepath).removesuffix(".h5").removesuffix(".HDR")
    mission_id = FileMissionID.from_input(filename[0:3])
    agency = FileAgency.from_input(filename[4])
    latency = FileLatency.from_input(filename[5])
    baseline = filename[6:8]
    file_type = FileType.from_input(filename[9:19])
    start_sensing_time = pd.Timestamp(filename[20:35])
    start_processing_time = pd.Timestamp(filename[37:52])
    orbit_number = int(filename[54:59])
    frame_id = filename[59]
    orbit_and_frame = filename[54:60]

    info = ProductInfo(
        mission_id=mission_id,
        agency=agency,
        latency=latency,
        baseline=baseline,
        file_type=file_type,
        start_sensing_time=start_sensing_time,
        start_processing_time=start_processing_time,
        orbit_number=orbit_number,
        frame_id=frame_id,
        orbit_and_frame=orbit_and_frame,
        name=filename,
        filepath=product_filepath,
        hdr_filepath=hdr_filepath,
    )

    return info

get_product_infos

get_product_infos(filepaths, warn=False, must_exist=True)

Extracts product metadata from EarthCARE product file paths (e.g. file_type, orbit_number, frame_id, baseline, ...).

Parameters:

Name Type Description Default
filepaths str | list[str] | NDArray | DataFrame | Dataset

Input sources for EarthCARE product files. Can be one of - str -> A single file path. - list[str] or numpy.ndarray -> A list or array of file paths. - pandas.DataFrame -> Must contain a 'filepath' column. - xarray.Dataset -> Must have encoding with attribute 'source' (str) or 'sources' (list[str]).

required
**kwargs

Additional arguments passed to get_product_info().

required

Returns:

Name Type Description
ProductDataFrame ProductDataFrame

A dataframe containing extracted product information.

Source code in earthcarekit/utils/read/product/file_info/product_info.py
def get_product_infos(
    filepaths: str | list[str] | NDArray | pd.DataFrame | xr.Dataset,
    warn: bool = False,
    must_exist: bool = True,
) -> "ProductDataFrame":
    """
    Extracts product metadata from EarthCARE product file paths (e.g. file_type, orbit_number, frame_id, baseline, ...).

    Args:
        filepaths:
            Input sources for EarthCARE product files. Can be one of
            - `str` -> A single file path.
            - `list[str]` or `numpy.ndarray` -> A list or array of file paths.
            - `pandas.DataFrame` -> Must contain a 'filepath' column.
            - `xarray.Dataset` -> Must have encoding with attribute 'source' (`str`) or 'sources' (`list[str]`).
        **kwargs: Additional arguments passed to `get_product_info()`.

    Returns:
        ProductDataFrame: A dataframe containing extracted product information.
    """
    _filepaths: list[str] | NDArray
    if isinstance(filepaths, (str, np.str_)):
        _filepaths = [str(filepaths)]
    elif isinstance(filepaths, xr.Dataset):
        ds: xr.Dataset = filepaths
        if not hasattr(ds, "encoding"):
            raise ValueError(f"Dataset missing encoding attribute.")
        elif "source" in ds.encoding:
            _filepaths = [ds.encoding["source"]]
        elif "sources" in ds.encoding:
            _filepaths = ds.encoding["sources"]
        else:
            raise ValueError(f"Dataset encoding does not contain source or sources.")
    elif isinstance(filepaths, pd.DataFrame):
        df: pd.DataFrame = filepaths
        if "filepath" in df:
            _filepaths = df["filepath"].to_numpy()
        else:
            raise ValueError(
                f"""Given dataframe does not contain a column of file paths. A valid file path column name is "filepath"."""
            )
    else:
        _filepaths = filepaths

    infos = []
    for filepath in _filepaths:
        try:
            infos.append(
                get_product_info(filepath, warn=warn, must_exist=must_exist).to_dict()
            )
        except ValueError as e:
            continue
    pdf = ProductDataFrame(infos)
    pdf.validate_columns()
    return pdf

read_any

read_any(input, **kwargs)

Reads various input types and returns an xarray.Dataset.

This function can read
  • EarthCARE product files (.h5)
  • NetCDF files (.nc)
  • Manually processed PollyXT output files (.txt)

Parameters:

Name Type Description Default
input str | Dataset

File path or existing Dataset.

required
**kwargs

Additional keyword arguments for specific readers.

{}

Returns:

Type Description
Dataset

xr.Dataset: Opened dataset.

Raises:

Type Description
ValueError

If the file type is not supported.

TypeError

If the input type is invalid.

Source code in earthcarekit/utils/read/_read_any.py
def read_any(input: str | xr.Dataset, **kwargs) -> xr.Dataset:
    """Reads various input types and returns an `xarray.Dataset`.

    This function can read:
        - EarthCARE product files (`.h5`)
        - NetCDF files (`.nc`)
        - Manually processed PollyXT output files (`.txt`)

    Args:
        input (str | xr.Dataset): File path or existing Dataset.
        **kwargs: Additional keyword arguments for specific readers.

    Returns:
        xr.Dataset: Opened dataset.

    Raises:
        ValueError: If the file type is not supported.
        TypeError: If the input type is invalid.
    """
    if isinstance(input, xr.Dataset):
        return input
    elif isinstance(input, str):
        filepath = input

        if is_earthcare_product(filepath=filepath):
            return read_product(filepath, **kwargs)

        filename = os.path.basename(filepath)
        _, ext = os.path.splitext(filename)
        if ext.lower() == ".txt":
            return read_polly(filepath)
        elif ext.lower() == ".nc":
            return read_nc(filepath, **kwargs)

        raise ValueError(f"Reading of file not supported: <{input}>")
    raise TypeError(f"Invalid type '{type(input).__name__}' for input.")

read_header_data

read_header_data(source: str) -> Dataset
read_header_data(source: Dataset) -> Dataset
read_header_data(source)

Opens the product header groups of a EarthCARE file as a xarray.Dataset.

Source code in earthcarekit/utils/read/product/header_group.py
def read_header_data(source: str | xr.Dataset) -> xr.Dataset:
    """Opens the product header groups of a EarthCARE file as a `xarray.Dataset`."""
    if isinstance(source, str):
        filepath = source
    elif isinstance(source, xr.Dataset):
        filepath = source.encoding.get("source", None)
        if filepath is None:
            raise ValueError(f"Dataset missing source attribute")
    else:
        raise TypeError("Expected 'str' or 'xarray.Dataset'")

    groups = xr.open_groups(filepath)
    header_groups = {n: g for n, g in groups.items() if "HeaderData" in n}

    # Rename duplicate vars

    all_vars = {}
    header_datasets = []
    for i, (group_name, ds) in enumerate(header_groups.items()):
        ds_new = ds.copy()
        for var in ds.data_vars:
            if var in all_vars:
                new_name = f"{group_name.split('/')[-1]}_{var}"
                ds_new = ds_new.rename({var: new_name})
            else:
                all_vars[var] = True
        header_datasets.append(ds_new)

    ds = xr.merge(header_datasets)

    # Convert timestamps to numpy datetime
    for var in [
        "Creation_Date",
        "Validity_Start",
        "Validity_Stop",
        "ANXTime",
        "frameStartTime",
        "frameStopTime",
        "processingStartTime",
        "processingStopTime",
        "sensingStartTime",
        "sensingStopTime",
        "stateVectorTime",
    ]:
        if var in ds:
            raw = ds[var].values
            formatted = np.char.replace(raw, "UTC=", "")
            ds[var].values = formatted.astype("datetime64[ns]")

    # Ensure that strings are correctly decoded
    for var in ["frameID"]:
        if var in ds:
            ds = convert_scalar_var_to_str(ds, var)

    # Remove dimensions of size == 1
    ds = ds.squeeze()

    return ds

read_nc

read_nc(input, modify=True, in_memory=False, **kwargs)

Returns an xarray.Dataset from a Dataset or NetCDF file path, optionally loaded into memory.

Parameters:

Name Type Description Default
input Dataset or str

Path to a NetCDF file. If a already opened xarray.Dataset object is passed, it is returned as is.

required
modify bool

If True, default modifications to the opened dataset will be applied (e.g., converting heights in Polly data from height a.g.l. to height above mean sea level).

True
in_memory bool

If True, ensures the dataset is fully loaded into memory. Defaults to False.

False
**kwargs

Key-word arguments passed to xarray.open_dataset().

{}

Returns:

Type Description
Dataset

xarray.Dataset: The resulting dataset.

Raises:

Type Description
TypeError

If input is not a Dataset or string.

Source code in earthcarekit/utils/read/_read_nc.py
def read_nc(
    input: str | xr.Dataset,
    modify: bool = True,
    in_memory: bool = False,
    **kwargs,
) -> xr.Dataset:
    """Returns an `xarray.Dataset` from a Dataset or NetCDF file path, optionally loaded into memory.

    Args:
        input (xarray.Dataset or str): Path to a NetCDF file. If a already opened `xarray.Dataset` object is passed, it is returned as is.
        modify (bool): If True, default modifications to the opened dataset will be applied
            (e.g., converting heights in Polly data from height a.g.l. to height above mean sea level).
        in_memory (bool, optional): If True, ensures the dataset is fully loaded into memory. Defaults to False.
        **kwargs: Key-word arguments passed to `xarray.open_dataset()`.

    Returns:
        xarray.Dataset: The resulting dataset.

    Raises:
        TypeError: If input is not a Dataset or string.
    """
    ds: xr.Dataset
    if isinstance(input, xr.Dataset):
        ds = input
    elif isinstance(input, str):
        if in_memory:
            with _read_nc(input, modify=modify, **kwargs) as ds:
                ds = ds.load()
        else:
            ds = _read_nc(input, modify=modify, **kwargs)
    else:
        raise TypeError(
            f"Invalid input type! Expecting a opened NetCDF dataset (xarray.Dataset) or a path to a NetCDF file."
        )
    return ds

read_polly

read_polly(input)

Reads manually processed PollyXT output text files as xarray.Dataset or returns an already open one.

Source code in earthcarekit/utils/read/_read_polly.py
def read_polly(input: str | xr.Dataset) -> xr.Dataset:
    """Reads manually processed PollyXT output text files as `xarray.Dataset` or returns an already open one."""

    if isinstance(input, xr.Dataset):
        return input

    with open(input, "r", encoding="utf-8", errors="ignore") as f:
        df = pd.read_csv(f, sep="\t")

    new_columns = [_parse_column_name(c) for c in df.columns]
    new_column_names = [c.name for c in new_columns]
    new_column_names = _make_column_names_unique(new_column_names)
    df.columns = pd.Index(new_column_names)

    ds = xr.Dataset.from_dataframe(df)
    ds = ds.assign_coords(index=ds.height.values)
    ds = ds.rename({"index": "vertical"})
    if "time" not in ds:
        ds = ds.assign({"time": np.datetime64("1970-01-01T00:00:00.000", "ms")})

    vars_order = ["time"] + [v for v in ds.data_vars if v != "time"]
    ds = ds[vars_order]

    for c in new_columns:
        if c.units == "km":
            ds[c.name].values = ds[c.name].values * 1e3
            c.units = c.units.replace("k", "")
        elif c.units in ["Mm-1 sr-1", "Mm-1", "Msr-1"]:
            ds[c.name].values = ds[c.name].values / 1e6
            c.units = c.units.replace("M", "")

        ds[c.name] = ds[c.name].assign_attrs(
            dict(
                long_name=c.long_name,
                units=c.units,
            )
        )
    return ds

read_product

read_product(
    input,
    trim_to_frame=True,
    modify=DEFAULT_READ_EC_PRODUCT_MODIFY,
    header=DEFAULT_READ_EC_PRODUCT_HEADER,
    meta=DEFAULT_READ_EC_PRODUCT_META,
    ensure_nans=DEFAULT_READ_EC_PRODUCT_ENSURE_NANS,
    in_memory=False,
    **kwargs
)

Returns an xarray.Dataset from a Dataset or EarthCARE file path, optionally loaded into memory.

Parameters:

Name Type Description Default
input str or Dataset

Path to a EarthCARE file. If a xarray.Dataset is given it will be returned as is.

required
trim_to_frame bool

Whether to trim the dataset to latitude frame bounds. Defaults to True.

True
modify bool

If True, default modifications to the opened dataset will be applied (e.g., renaming dimension corresponding to height to "vertical"). Defaults to True.

DEFAULT_READ_EC_PRODUCT_MODIFY
header bool

If True, all header data will be included in the dataframe. Defaults to False.

DEFAULT_READ_EC_PRODUCT_HEADER
meta bool

If True, select meta data from header (like orbit number and frame ID) will be included in the dataframe. Defaults to True.

DEFAULT_READ_EC_PRODUCT_META
ensure_nans bool

If True, ensures that _FillValues are set to NaNs even if encoding of _FillValues or dtype is missing. Be aware, if True increases reading time. Defaults to False.

DEFAULT_READ_EC_PRODUCT_ENSURE_NANS
in_memory bool

If True, ensures the dataset is fully loaded into memory. Defaults to False.

False

Returns:

Type Description
Dataset

xarray.Dataset: The resulting dataset.

Raises:

Type Description
TypeError

If input is not a Dataset or string.

Source code in earthcarekit/utils/read/product/_generic.py
def read_product(
    input: str | Dataset,
    trim_to_frame: bool = True,
    modify: bool = DEFAULT_READ_EC_PRODUCT_MODIFY,
    header: bool = DEFAULT_READ_EC_PRODUCT_HEADER,
    meta: bool = DEFAULT_READ_EC_PRODUCT_META,
    ensure_nans: bool = DEFAULT_READ_EC_PRODUCT_ENSURE_NANS,
    in_memory: bool = False,
    **kwargs,
) -> Dataset:
    """Returns an `xarray.Dataset` from a Dataset or EarthCARE file path, optionally loaded into memory.

    Args:
        input (str or xarray.Dataset): Path to a EarthCARE file. If a `xarray.Dataset` is given it will be returned as is.
        trim_to_frame (bool, optional): Whether to trim the dataset to latitude frame bounds. Defaults to True.
        modify (bool, optional): If True, default modifications to the opened dataset will be applied
            (e.g., renaming dimension corresponding to height to "vertical"). Defaults to True.
        header (bool, optional): If True, all header data will be included in the dataframe. Defaults to False.
        meta (bool, optional): If True, select meta data from header (like orbit number and frame ID) will be included in the dataframe. Defaults to True.
        ensure_nans (bool, optional): If True, ensures that _FillValues are set to NaNs even  if encoding of _FillValues or dtype is missing.
            Be aware, if True increases reading time. Defaults to False.
        in_memory (bool, optional): If True, ensures the dataset is fully loaded into memory. Defaults to False.

    Returns:
        xarray.Dataset: The resulting dataset.

    Raises:
        TypeError: If input is not a Dataset or string.
    """
    ds: Dataset
    if isinstance(input, Dataset):
        ds = input
    elif isinstance(input, str):
        kwargs = dict(
            trim_to_frame=trim_to_frame,
            modify=modify,
            header=header,
            meta=meta,
            ensure_nans=ensure_nans,
            **kwargs,
        )
        if in_memory:
            with _read_product(filepath=input, **kwargs) as ds:
                ds = ds.load()
        else:
            ds = _read_product(filepath=input, **kwargs)
    else:
        raise TypeError(
            f"Invalid input type! Expecting a opened EarthCARE dataset (xarray.Dataset) or a path to a EarthCARE product."
        )
    return ds

read_products

read_products(
    filepaths,
    zoom_at=None,
    along_track_dim=ALONG_TRACK_DIM,
    func=None,
    func_inputs=None,
    max_num_files=8,
    coarsen=True,
)

Read and concatenate a sequence of EarthCARE frames into a single xarray Dataset.

By default, the dataset is coarsened according to the number of input frames (e.g., combining 3 products averages every 3 profiles, so the along-track dimension remains comparable to a single product). Optionally applies a processing function to each frame and zooms in on a specific region (defined by zoom_at) without coarsening. Coarsening can also be turned of but might case memory issues.

Parameters:

Name Type Description Default
filepaths Sequence[str] or DataFrame

EarthCARE product file paths as a list or a DataFrame with metadata including filepath, orbit_number, and frame_id.

required
zoom_at float

If set, selects only a zoomed-in portion of the frames around this fractional index. Defaults to None.

None
along_track_dim str

Name of the dimension to concatenate along. Defaults to ALONG_TRACK_DIM.

ALONG_TRACK_DIM
func Callable

Function to apply to each frame after loading. Defaults to None.

None
func_inputs Sequence[dict]

Optional per-frame arguments to pass to func. Defaults to None.

None
max_num_files int

Max. number of files that are allowed to be loaded at once. A ValueError is raised if above. Defaults to 8 (e.g., full orbit).

8
coarsen bool

If Ture, read data sets are coarened depending on the number given of files. Only aplicable when not zooming. Defaults to Ture.

True

Returns:

Name Type Description
Dataset Dataset

Concatenated dataset with all frames along along_track_dim.

Source code in earthcarekit/utils/read/product/_concat.py
def read_products(
    filepaths: Sequence[str] | NDArray[np.str_] | pd.DataFrame,
    zoom_at: float | None = None,
    along_track_dim: str = ALONG_TRACK_DIM,
    func: Callable | None = None,
    func_inputs: Sequence[dict] | None = None,
    max_num_files: int = 8,
    coarsen: bool = True,
) -> Dataset:
    """Read and concatenate a sequence of EarthCARE frames into a single xarray Dataset.

    By default, the dataset is coarsened according to the number of input frames (e.g.,
    combining 3 products averages every 3 profiles, so the along-track dimension remains
    comparable to a single product). Optionally applies a processing function to each
    frame and zooms in on a specific region (defined by `zoom_at`) without coarsening.
    Coarsening can also be turned of but might case memory issues.

    Args:
        filepaths (Sequence[str] or pandas.DataFrame):
            EarthCARE product file paths as a list or a DataFrame with metadata
            including `filepath`, `orbit_number`, and `frame_id`.
        zoom_at (float, optional):
            If set, selects only a zoomed-in portion of the frames around this
            fractional index. Defaults to None.
        along_track_dim (str, optional):
            Name of the dimension to concatenate along. Defaults to ALONG_TRACK_DIM.
        func (Callable, optional):
            Function to apply to each frame after loading. Defaults to None.
        func_inputs (Sequence[dict], optional):
            Optional per-frame arguments to pass to `func`. Defaults to None.
        max_num_files (int, optional):
            Max. number of files that are allowed to be loaded at once.
            A `ValueError` is raised if above. Defaults to 8 (e.g., full orbit).
        coarsen (bool, optional):
            If Ture, read data sets are coarened depending on the number given of files.
            Only aplicable when not zooming. Defaults to Ture.

    Returns:
        Dataset: Concatenated dataset with all frames along `along_track_dim`.
    """
    if isinstance(filepaths, str):
        filepaths = [filepaths]
    elif isinstance(filepaths, pd.DataFrame):
        df = filepaths.sort_values(by="orbit_and_frame")
        filepaths = df["filepath"].tolist()
    else:
        df = ProductDataFrame.from_files(list(filepaths)).sort_values(
            by="orbit_and_frame"
        )
        df.validate_columns()
        filepaths = df["filepath"].tolist()

    if len(filepaths) == 0:
        raise ValueError(f"Given sequence of product files paths is empty")
    elif len(filepaths) == 1:
        warnings.warn(f"Can not concatenate frames since only one file path was given")
        return read_product(filepaths[0])
    elif len(filepaths) > max_num_files:
        raise ValueError(
            f"Too many files provided: {len(filepaths)} (currently maximum allowed is {max_num_files}). "
            "Please reduce the number of files or increase the allowed amount by setting the argument max_num_files."
        )
    elif len(filepaths) > 8:
        warnings.warn(
            f"You provided {len(filepaths)} files, which is more than one full orbit (8 files). "
            "Processing might take longer than usual."
        )

    # # Construct filename suffix from orbit/frame numbers
    # orbit_start = str(df["orbit_number"].iloc[0]).zfill(5)
    # orbit_end = str(df["orbit_number"].iloc[-1]).zfill(5)
    # frame_start = df["frame_id"].iloc[0]
    # frame_end = df["frame_id"].iloc[-1]

    # if orbit_start == orbit_end:
    #     oaf_string = (
    #         f"{orbit_start}{frame_start}"
    #         if frame_start == frame_end
    #         else f"{orbit_start}{frame_start}-{frame_end}"
    #     )
    # else:
    #     oaf_string = f"{orbit_start}{frame_start}-{orbit_end}{frame_end}"

    def apply_func(ds: Dataset, i: int) -> Dataset:
        """Apply a processing function to a dataset if specified."""
        if func is None:
            return ds
        if func_inputs is None:
            return func(ds)
        if i < len(func_inputs):
            return func(ds, **func_inputs[i])
        raise IndexError("Too few function inputs provided")

    num_files = len(filepaths)
    ds: xr.Dataset | None = None

    if zoom_at is not None:
        # Zoomed read: select portions of two adjacent frames
        frame_indices = np.unique([int(np.floor(zoom_at)), int(np.ceil(zoom_at))])
        offset = zoom_at - frame_indices[0]
        filepaths = [filepaths[i] for i in frame_indices]

        for i, filepath in enumerate(filepaths):
            with read_product(filepath) as frame_ds:
                frame_ds = apply_func(frame_ds, frame_indices[i])

                # Preserve original dtypes
                original_dtypes = {v: frame_ds[v].dtype for v in frame_ds.variables}

                # Select relevant portion of the frame
                n = len(frame_ds[along_track_dim])
                sel_slice = (
                    slice(int(np.floor(n * offset)), n)
                    if i == 0
                    else slice(0, int(np.ceil(n * offset)))
                )
                frame_ds = frame_ds.sel({along_track_dim: sel_slice})

                # Restore dtypes
                for v, dtype in original_dtypes.items():
                    frame_ds[v] = frame_ds[v].astype(dtype)

                ds = (
                    frame_ds.copy()
                    if ds is None
                    else concat_datasets(
                        ds.copy(), frame_ds.copy(), dim=along_track_dim
                    )
                )

    else:
        # Full read and coarsen each frame
        for i, filepath in enumerate(filepaths):
            with read_product(filepath) as frame_ds:
                frame_ds = apply_func(frame_ds, i)

                if coarsen:
                    original_dtypes = {v: frame_ds[v].dtype for v in frame_ds.variables}

                    coarsen_dims = {along_track_dim: num_files}

                    # Circular mean for longitude
                    lon_coarse = (
                        frame_ds["longitude"]
                        .coarsen(coarsen_dims, boundary="trim")
                        .reduce(circular_mean_np)
                    )
                    _tmp_attrs = lon_coarse.attrs
                    lon_coarse.attrs = {}

                    # Regular mean for the rest
                    rest = (
                        frame_ds.drop_vars("longitude")
                        .coarsen(coarsen_dims, boundary="trim")
                        .mean()  # type: ignore
                    )

                    # Merge results
                    frame_ds = xr.merge([lon_coarse, rest])
                    frame_ds["longitude"].attrs = _tmp_attrs

                    for v, dtype in original_dtypes.items():
                        frame_ds[v] = frame_ds[v].astype(dtype)

                ds = (
                    frame_ds
                    if ds is None
                    else concat_datasets(ds, frame_ds, dim=along_track_dim)
                )

    # Set output file sources
    if isinstance(ds, Dataset):
        ds.encoding["sources"] = list(filepaths)
        return ds
    else:
        raise RuntimeError(f"Bad implementation")

read_science_data

read_science_data(
    filepath, agency=None, ensure_nans=False, **kwargs
)

Opens the science data of a EarthCARE file as a xarray.Dataset.

Source code in earthcarekit/utils/read/product/science_group.py
def read_science_data(
    filepath: str,
    agency: Union["FileAgency", None] = None,
    ensure_nans: bool = False,
    **kwargs,
) -> xr.Dataset:
    """Opens the science data of a EarthCARE file as a `xarray.Dataset`."""
    from .file_info.agency import (
        FileAgency,  # Imported inside function to avoid circular import error
    )

    if agency is None:
        agency = FileAgency.from_input(filepath)

    if agency == FileAgency.ESA:
        ds = xr.open_dataset(filepath, group="ScienceData", engine=_engine, **kwargs)
    elif agency == FileAgency.JAXA:
        df_cpr_geo = xr.open_dataset(
            filepath,
            group="ScienceData/Geo",
            engine=_engine,
            phony_dims="sort",
            **kwargs,
        )
        df_cpr_data = xr.open_dataset(
            filepath,
            group="ScienceData/Data",
            engine=_engine,
            phony_dims="sort",
            **kwargs,
        )
        ds = xr.merge([df_cpr_data, df_cpr_geo])
        ds.encoding["source"] = df_cpr_data.encoding["source"]
    else:
        raise NotImplementedError()

    if ensure_nans:
        ds = _convert_all_fill_values_to_nan(ds)

    return ds

rebin_msi_to_jsg

rebin_msi_to_jsg(
    ds_msi,
    ds_xjsg,
    vars=None,
    k=4,
    eps=1e-12,
    lat_var=SWATH_LAT_VAR,
    lon_var=SWATH_LON_VAR,
    time_var=TIME_VAR,
    along_track_dim=ALONG_TRACK_DIM,
    across_track_dim=ACROSS_TRACK_DIM,
    lat_var_xjsg=SWATH_LAT_VAR,
    lon_var_xjsg=SWATH_LON_VAR,
    time_var_xjsg=TIME_VAR,
    along_track_dim_xjsg=ALONG_TRACK_DIM,
    across_track_dim_xjsg=ACROSS_TRACK_DIM,
)

Rebins variables from an MSI product dataset onto the geo-spacial lat/lon grid given by the related AUX_JSG_1D dataset.

This function interpolates selected variables from ds_msi onto the JSG grid from ds_xjsg using quick kd-tree nearest-neighbor search with scipy.spatial.cKDTree followed by averaging the k-nearest points using inverse distance weighting. The resulting dataframe matches the along- and across-track resolution of ds_xjsg.

Parameters:

Name Type Description Default
ds_msi Dataset | str

The source MSI dataset (e.g., MSI_RGR_1C, MSI_COP_2A, ...).

required
ds_xjsg Dataset | str

The target XJSG dataset.

required
vars list[str] | None

List of variable names from ds_msi to rebin. If None, all data variables are considered. Defaults to None.

None
k int

Number of nearest geo-spacial neighbors to include in the kd-tree search. Defaults to 4.

4
eps float

Numerical threshold to avoid division by zero in distance calculations during the kd-tree search. Defaults to 1e-12.

1e-12

Returns:

Type Description
Dataset

xr.Dataset: The MSI dataset with variables rebinned to the JSG grid.

Source code in earthcarekit/utils/read/product/_rebin_msi_to_jsg.py
def rebin_msi_to_jsg(
    ds_msi: xr.Dataset | str,
    ds_xjsg: xr.Dataset | str,
    vars: list[str] | None = None,
    k: int = 4,
    eps: float = 1e-12,
    lat_var: str = SWATH_LAT_VAR,
    lon_var: str = SWATH_LON_VAR,
    time_var: str = TIME_VAR,
    along_track_dim: str = ALONG_TRACK_DIM,
    across_track_dim: str = ACROSS_TRACK_DIM,
    lat_var_xjsg: str = SWATH_LAT_VAR,
    lon_var_xjsg: str = SWATH_LON_VAR,
    time_var_xjsg: str = TIME_VAR,
    along_track_dim_xjsg: str = ALONG_TRACK_DIM,
    across_track_dim_xjsg: str = ACROSS_TRACK_DIM,
) -> xr.Dataset:
    """
    Rebins variables from an MSI product dataset onto the geo-spacial lat/lon grid given by the related AUX_JSG_1D dataset.

    This function interpolates selected variables from `ds_msi` onto the JSG grid from `ds_xjsg`
    using quick kd-tree nearest-neighbor search with `scipy.spatial.cKDTree` followed
    by averaging the `k`-nearest points using inverse distance weighting. The resulting dataframe
    matches the along- and across-track resolution of `ds_xjsg`.

    Args:
        ds_msi (xr.Dataset | str): The source MSI dataset (e.g., MSI_RGR_1C, MSI_COP_2A, ...).
        ds_xjsg (xr.Dataset | str): The target XJSG dataset.
        vars (list[str] | None, optional): List of variable names from `ds_msi` to rebin.
            If None, all data variables are considered. Defaults to None.
        k (int, optional): Number of nearest geo-spacial neighbors to include in the kd-tree search.
            Defaults to 4.
        eps (float, optional): Numerical threshold to avoid division by zero in distance calculations during the kd-tree search.
            Defaults to 1e-12.

    Returns:
        xr.Dataset: The MSI dataset with variables rebinned to the JSG grid.
    """

    def _read_msi() -> xr.Dataset:
        if isinstance(ds_msi, str):
            return read_product(ds_msi)
        return ds_msi

    def _read_xjsg() -> xr.Dataset:
        if isinstance(ds_xjsg, str):
            return read_product(ds_xjsg)
        return ds_xjsg

    with (
        _read_msi() as ds_msi,
        _read_xjsg() as ds_xjsg,
    ):
        if vars is None:
            vars = [str(v) for v in ds_msi.variables]
        else:
            for var in vars:
                if var not in ds_msi.variables:
                    present_vars = [str(v) for v in ds_msi.variables]
                    raise KeyError(
                        f"""X-MET dataset does not contain variable "{var}". Present variables are: {", ".join(present_vars)}"""
                    )

        ds_xjsg = ds_xjsg.copy().swap_dims(
            {
                along_track_dim_xjsg: along_track_dim,
                across_track_dim_xjsg: across_track_dim,
            }
        )

        new_ds_msi = ds_msi.copy().swap_dims(
            {
                along_track_dim: f"{along_track_dim}_original",
                across_track_dim: f"{across_track_dim}_original",
            }
        )
        new_ds_msi[time_var] = ds_xjsg[time_var_xjsg].copy()

        lat_msi = ds_msi[lat_var].values.flatten()
        lon_msi = ds_msi[lon_var].values.flatten()
        coords_msi = sequence_geo_to_ecef(lat_msi, lon_msi)

        lat_jsg = ds_xjsg[lat_var_xjsg].values.flatten()
        lon_jsg = ds_xjsg[lon_var_xjsg].values.flatten()
        coords_jsg = sequence_geo_to_ecef(lat_jsg, lon_jsg)

        tree = cKDTree(coords_msi)
        dists, idxs = tree.query(coords_jsg, k=k)

        dims: str | tuple[str, str]
        for var in vars:
            if ds_msi[var].dims == (along_track_dim, across_track_dim):
                dims = (along_track_dim, across_track_dim)

                values = ds_msi[var].values
                values_flat = values.flatten()

                mask_nan = np.isnan(values_flat[idxs])

                _dists = dists
                _dists[mask_nan] = np.inf

                # Inverse distance weighting
                if k > 1:
                    weights = 1.0 / (_dists + eps)
                    weights /= np.sum(weights, axis=1, keepdims=True)
                else:
                    weights = np.ones(idxs.shape)

                if k > 1:
                    _v = values_flat[idxs]

                    if np.issubdtype(_v.dtype, np.floating):
                        m = np.all(np.isnan(_v), axis=1)
                        _v[np.isnan(_v)] = 0.0
                        _v[m] = np.nan

                    result = np.sum(_v * weights, axis=1)

                    new_values = result
                else:
                    new_values = values_flat[idxs]

                new_values = new_values.reshape(ds_xjsg.latitude_swath.shape)

                new_var = f"{var}"
                new_ds_msi[new_var] = (dims, new_values)
                new_ds_msi[new_var].attrs = ds_msi[var].attrs
            elif var not in _SKIP_VARS and var in ds_msi and var in ds_xjsg:
                new_ds_msi[var] = ds_xjsg[var].copy()
                new_ds_msi[var].attrs = ds_xjsg[var].attrs
            else:
                continue

        return new_ds_msi

rebin_xmet_to_vertical_track

rebin_xmet_to_vertical_track(
    ds_xmet,
    ds_vert,
    vars=None,
    k=4,
    eps=1e-12,
    lat_var=TRACK_LAT_VAR,
    lon_var=TRACK_LON_VAR,
    time_var=TIME_VAR,
    height_var=HEIGHT_VAR,
    along_track_dim=ALONG_TRACK_DIM,
    height_dim=VERTICAL_DIM,
    xmet_lat_var="latitude",
    xmet_lon_var="longitude",
    xmet_height_var="geometrical_height",
    xmet_height_dim="height",
    xmet_horizontal_grid_dim="horizontal_grid",
)

Rebins variables from an AUX_MET_1D (XMET) dataset onto the vertical curtain track of given by another dataset (e.g. ATL_EBD_2A).

This function interpolates selected variables from ds_xmet onto a EarthCARE vertical track given in ds_vert, using quick horizontal kd-tree nearest-neighbor search with scipy.spatial.cKDTree followed by averaging the k-nearest vertical XMET profiles using inverse distance weighting. The resulting profiles are then interpolated in the vertical to match the height resolution of ds_vert.

Parameters:

Name Type Description Default
ds_xmet Dataset | str

The source XMET dataset from which vertical curtain along track will be interpolated.

required
ds_vert Dataset | str

The target dataset containing the vertical curtain track.

required
vars list[str] | None

List of variable names from ds_xmet to rebin. If None, all data variables are considered.

None
k int

Number of nearest horizontal neighbors to include in the kd-tree search. Defaults to 4.

4
eps float

Numerical threshold to avoid division by zero in distance calculations during the kd-tree search. Defaults to 1e-12.

1e-12
lat_var str

Name of the latitude variable in ds_vert. Defaults to TRACK_LAT_VAR.

TRACK_LAT_VAR
lon_var str

Name of the longitude variable in ds_vert. Defaults to TRACK_LON_VAR.

TRACK_LON_VAR
time_var str

Name of the time variable in ds_vert. Defaults to TIME_VAR.

TIME_VAR
height_var str

Name of the height variable in ds_vert. Defaults to HEIGHT_VAR.

HEIGHT_VAR
along_track_dim str

Name of the along-track dimension in ds_vert. Defaults to ALONG_TRACK_DIM.

ALONG_TRACK_DIM
height_dim str

Name of the vertical or height dimension in ds_vert. Defaults to VERTICAL_DIM.

VERTICAL_DIM
xmet_lat_var str

Name of the latitude variable in ds_xmet. Defaults to "latitude".

'latitude'
xmet_lon_var str

Name of the longitude variable in ds_xmet. Defaults to "longitude".

'longitude'
xmet_height_var str

Name of the height variable in ds_xmet. Defaults to "geometrical_height".

'geometrical_height'
xmet_height_dim str

Name of the vertical dimension in ds_xmet. Defaults to "height".

'height'
xmet_horizontal_grid_dim str

Name of the horizontal grid dimension in ds_xmet. Defaults to "horizontal_grid".

'horizontal_grid'

Returns:

Type Description
Dataset

xr.Dataset: A new dataset containing the selected XMET variables interpolated to the grid of the vertical curtain given in ds_vert. This new dataset has the same along-track and vertical dimensions as ds_vert.

Raises:

Type Description
KeyError

If any specified variable or coordinate name is not found in ds_xmet.

Source code in earthcarekit/utils/read/product/_rebin_xmet_to_vertical_track.py
def rebin_xmet_to_vertical_track(
    ds_xmet: xr.Dataset | str,
    ds_vert: xr.Dataset | str,
    vars: list[str] | None = None,
    k: int = 4,
    eps: float = 1e-12,
    lat_var: str = TRACK_LAT_VAR,
    lon_var: str = TRACK_LON_VAR,
    time_var: str = TIME_VAR,
    height_var: str = HEIGHT_VAR,
    along_track_dim: str = ALONG_TRACK_DIM,
    height_dim: str = VERTICAL_DIM,
    xmet_lat_var: str = "latitude",
    xmet_lon_var: str = "longitude",
    xmet_height_var: str = "geometrical_height",
    xmet_height_dim: str = "height",
    xmet_horizontal_grid_dim: str = "horizontal_grid",
) -> xr.Dataset:
    """
    Rebins variables from an AUX_MET_1D (XMET) dataset onto the vertical curtain track of given by another dataset (e.g. ATL_EBD_2A).

    This function interpolates selected variables from `ds_xmet` onto a EarthCARE
    vertical track given in `ds_vert`, using quick horizontal kd-tree nearest-neighbor search with `scipy.spatial.cKDTree` followed
    by averaging the `k`-nearest vertical XMET profiles using inverse distance weighting. The resulting
    profiles are then interpolated in the vertical to match the height resolution of `ds_vert`.

    Args:
        ds_xmet (xr.Dataset | str): The source XMET dataset from which vertical curtain along track will be interpolated.
        ds_vert (xr.Dataset | str): The target dataset containing the vertical curtain track.
        vars (list[str] | None, optional): List of variable names from `ds_xmet` to rebin.
            If None, all data variables are considered.
        k (int, optional): Number of nearest horizontal neighbors to include in the kd-tree search.
            Defaults to 4.
        eps (float, optional): Numerical threshold to avoid division by zero in distance calculations during the kd-tree search.
            Defaults to 1e-12.
        lat_var (str, optional): Name of the latitude variable in `ds_vert`.
            Defaults to TRACK_LAT_VAR.
        lon_var (str, optional): Name of the longitude variable in `ds_vert`.
            Defaults to TRACK_LON_VAR.
        time_var (str, optional): Name of the time variable in `ds_vert`.
            Defaults to TIME_VAR.
        height_var (str, optional): Name of the height variable in `ds_vert`.
            Defaults to HEIGHT_VAR.
        along_track_dim (str, optional): Name of the along-track dimension in `ds_vert`.
            Defaults to ALONG_TRACK_DIM.
        height_dim (str, optional): Name of the vertical or height dimension in `ds_vert`.
            Defaults to VERTICAL_DIM.
        xmet_lat_var (str, optional): Name of the latitude variable in `ds_xmet`.
            Defaults to "latitude".
        xmet_lon_var (str, optional): Name of the longitude variable in `ds_xmet`.
            Defaults to "longitude".
        xmet_height_var (str, optional): Name of the height variable in `ds_xmet`.
            Defaults to "geometrical_height".
        xmet_height_dim (str, optional): Name of the vertical dimension in `ds_xmet`.
            Defaults to "height".
        xmet_horizontal_grid_dim (str, optional): Name of the horizontal grid dimension in `ds_xmet`.
            Defaults to "horizontal_grid".

    Returns:
        xr.Dataset: A new dataset containing the selected XMET variables interpolated to the grid of the
            vertical curtain given in `ds_vert`. This new dataset has the same along-track and vertical
            dimensions as `ds_vert`.

    Raises:
        KeyError: If any specified variable or coordinate name is not found in `ds_xmet`.
    """

    def _read_xmet() -> xr.Dataset:
        if isinstance(ds_xmet, str):
            return read_product_xmet(ds_xmet)
        return ds_xmet

    def _read_vert() -> xr.Dataset:
        if isinstance(ds_vert, str):
            return read_product(ds_vert)
        return ds_vert

    with (
        _read_xmet() as ds_xmet,
        _read_vert() as ds_vert,
    ):

        if vars is None:
            vars = [str(v) for v in ds_xmet.variables]
        else:
            for var in vars:
                if var not in ds_xmet.variables:
                    present_vars = [str(v) for v in ds_xmet.variables]
                    raise KeyError(
                        f"""X-MET dataset does not contain variable "{var}". Present variables are: {", ".join(present_vars)}"""
                    )

        new_ds_xmet = ds_xmet.copy().swap_dims({xmet_height_dim: "tmp_xmet_height"})
        new_ds_xmet[time_var] = ds_vert[time_var].copy()
        new_ds_xmet[height_var] = ds_vert[height_var].copy()

        hgrid_lat = ds_xmet[xmet_lat_var].values.flatten()
        hgrid_lon = ds_xmet[xmet_lon_var].values.flatten()
        hgrid_alt = ds_xmet[xmet_height_var].values
        hgrid_coords = sequence_geo_to_ecef(hgrid_lat, hgrid_lon)

        track_lat = ds_vert[lat_var].values
        track_lon = ds_vert[lon_var].values
        track_alt = ds_vert[height_var].values
        track_coords = sequence_geo_to_ecef(track_lat, track_lon)

        tree = cKDTree(hgrid_coords)
        dists, idxs = tree.query(track_coords, k=k)

        # Inverse distance weighting
        if k > 1:
            weights = 1.0 / (dists + eps)
            weights /= np.sum(weights, axis=1, keepdims=True)
            height = np.einsum("ij,ijh->ih", weights, hgrid_alt[idxs])
        else:
            weights = np.ones(idxs.shape)
            height = hgrid_alt[idxs]

        # Handle longitudes separately to account for sign changes at the dateline
        if xmet_lon_var in vars:
            vars.remove(xmet_lon_var)
        if k > 1:
            new_coords = np.sum(
                hgrid_coords[idxs] * weights.reshape((*weights.shape, 1)), axis=1
            )
        else:
            new_coords = hgrid_coords[idxs]
        new_lons = sequence_ecef_to_geo(
            x=new_coords[:, 0],
            y=new_coords[:, 1],
            z=new_coords[:, 2],
        )[:, 1]
        new_ds_xmet[xmet_lon_var] = xr.DataArray(
            data=new_lons,
            dims=along_track_dim,
            attrs=new_ds_xmet[xmet_lon_var].attrs,
        )

        # Handle all remaining variables
        dims: str | tuple[str, str]
        for var in vars:
            values = ds_xmet[var].values
            if len(values.shape) == 0:
                continue

            if len(values.shape) == 1:
                dims = along_track_dim

                if k > 1:
                    result = np.sum(values[idxs] * weights, axis=1)
                    new_values = result
                else:
                    new_values = values[idxs]
            else:
                dims = (along_track_dim, height_dim)

                if k > 1:
                    result = np.einsum("ij,ijh->ih", weights, values[idxs])
                else:
                    result = values[idxs]

                new_values = np.empty(track_alt.shape)
                new_values[:] = np.nan

                for i in np.arange(track_alt.shape[0]):
                    _new_values = np.interp(
                        track_alt[i],
                        height[i],
                        result[i],
                    )

                    new_values[i] = _new_values

            new_var = f"{var}"
            new_ds_xmet[new_var] = (dims, new_values)
            new_ds_xmet[new_var].attrs = ds_xmet[var].attrs

        # Remove original horizontal grid dims and associated variables
        new_ds_xmet = remove_dims(
            new_ds_xmet, [xmet_horizontal_grid_dim, xmet_height_dim]
        )

        return new_ds_xmet

search_files_by_regex

search_files_by_regex(root_dirpath, regex_pattern)

Recursively searches for files in a directory that match a given regex pattern.

Parameters:

Name Type Description Default
root_dirpath str

The root directory to start the search from.

required
regex_pattern str

A regular expression pattern to match file names against.

required
Return

list[str]: A list of absolute file paths that point to files with matching names.

Raises:

Type Description
FileNotFoundError

If the root directory does not exist.

error

If the given pattern is not a valid regular expression.

Source code in earthcarekit/utils/read/search.py
def search_files_by_regex(root_dirpath: str, regex_pattern: str) -> list[str]:
    """
    Recursively searches for files in a directory that match a given regex pattern.

    Args:
        root_dirpath (str): The root directory to start the search from.
        regex_pattern (str): A regular expression pattern to match file names against.

    Return:
        list[str]: A list of absolute file paths that point to files with matching names.

    Raises:
        FileNotFoundError: If the root directory does not exist.
        re.error: If the given pattern is not a valid regular expression.
    """
    if not os.path.exists(root_dirpath):
        raise FileNotFoundError(
            f"{search_files_by_regex.__name__}() Root directory does not exist: {root_dirpath}"
        )

    filepaths = []
    for dirpath, _, filenames in os.walk(root_dirpath):
        for filename in filenames:
            filepath = os.path.join(dirpath, filename)
            if re.search(regex_pattern, filename):
                filepaths.append(filepath)
    return filepaths

search_product

search_product(
    root_dirpath=None,
    config=None,
    file_type=None,
    agency=None,
    latency=None,
    timestamp=None,
    baseline=None,
    orbit_and_frame=None,
    orbit_number=None,
    frame_id=None,
    filename=None,
    start_time=None,
    end_time=None,
    mode="exhaustive",
)

Searches for EarthCARE product files matching given metadata filters.

Parameters:

Name Type Description Default
root_dirpath str

Root directory to search. Defaults to directory given in a configuration file.

None
config str | ECKConfig | None

Path to a config.toml file or a ECKConfig instance. Defaults to the default configuration file path.

None
file_type str | Sequence[str]

Product file type(s) to match.

None
agency str | Sequence[str]

Producing agency or agencies (e.g. "ESA" or "JAXA").

None
latency str | Sequence[str]

Data latency level(s).

None
timestamp TimestampLike | Sequence

Timestamp(s) included in the product's time coverage.

None
baseline str | Sequence[str]

Baseline version(s).

None
orbit_and_frame str | Sequence[str]

Orbit and frame identifiers.

None
orbit_number int, str, | Sequence

Orbit number(s).

None
frame_id str | Sequence[str]

Frame identifier(s).

None
filename str | Sequence[str]

Specific filename(s) or regular expression patterns to match.

None
start_time TimestampLike

First timestamp included in the product's time coverage.

None
end_time TimestampLike

Last timestamp included in the product's time coverage.

None
mode Literal['exhaustive', 'fast']

Search strategy controlling completeness vs performance; the "exhaustive" mode recursivly scans all files under the root_directory, while the "fast" mode searches files only at expected paths and may miss files outside the standard data folder structure defined during the configuration of earthcarekit.

'exhaustive'

Returns:

Name Type Description
ProductDataFrame ProductDataFrame

Filtered list of matching product files as a pandas.DataFrame-based object.

Raises:

Type Description
FileNotFoundError

If root directory does not exist.

Source code in earthcarekit/utils/read/product/_search.py
def search_product(
    root_dirpath: str | None = None,
    config: str | ECKConfig | None = None,
    file_type: str | Sequence[str] | None = None,
    agency: str | Sequence[str] | None = None,
    latency: str | Sequence[str] | None = None,
    timestamp: TimestampLike | Sequence[TimestampLike] | None = None,
    baseline: str | Sequence[str] | None = None,
    orbit_and_frame: str | Sequence[str] | None = None,
    orbit_number: int | str | Sequence[int | str] | None = None,
    frame_id: str | Sequence[str] | None = None,
    filename: str | Sequence[str] | None = None,
    start_time: TimestampLike | None = None,
    end_time: TimestampLike | None = None,
    mode: Literal["exhaustive", "fast"] = "exhaustive",
) -> ProductDataFrame:
    """
    Searches for EarthCARE product files matching given metadata filters.

    Args:
        root_dirpath (str, optional): Root directory to search. Defaults to directory given in a configuration file.
        config (str | ECKConfig | None , optional): Path to a `config.toml` file or a ECKConfig instance. Defaults to the default configuration file path.
        file_type (str | Sequence[str], optional): Product file type(s) to match.
        agency (str | Sequence[str], optional): Producing agency or agencies (e.g. "ESA" or "JAXA").
        latency (str | Sequence[str], optional): Data latency level(s).
        timestamp (TimestampLike | Sequence, optional): Timestamp(s) included in the product's time coverage.
        baseline (str | Sequence[str], optional): Baseline version(s).
        orbit_and_frame (str | Sequence[str], optional): Orbit and frame identifiers.
        orbit_number (int, str, | Sequence, optional): Orbit number(s).
        frame_id (str | Sequence[str], optional): Frame identifier(s).
        filename (str | Sequence[str], optional): Specific filename(s) or regular expression patterns to match.
        start_time (TimestampLike, optional): First timestamp included in the product's time coverage.
        end_time (TimestampLike, optional): Last timestamp included in the product's time coverage.
        mode (Literal["exhaustive", "fast"]): Search strategy controlling completeness vs performance; the "exhaustive" mode
            recursivly scans all files under the `root_directory`, while the "fast" mode searches files only at expected paths
            and may miss files outside the standard data folder structure defined during the configuration of earthcarekit.

    Returns:
        ProductDataFrame: Filtered list of matching product files as a `pandas.DataFrame`-based object.

    Raises:
        FileNotFoundError: If root directory does not exist.
    """
    if not isinstance(config, ECKConfig):
        config = read_config(config)

    if not isinstance(root_dirpath, str):
        root_dirpath = config.path_to_data

    if not os.path.exists(root_dirpath):
        raise FileNotFoundError(f"Given root directory does not exist: {root_dirpath}")

    mission_id = "ECA"

    if isinstance(file_type, str):
        file_type = [file_type]
    if isinstance(file_type, Sequence):
        _baseline: list[str] = []
        _file_type: list[str] = []
        for i, ft in enumerate(file_type):
            if isinstance(ft, str):
                _parts = ft.split(":")
                if len(_parts) == 2:
                    _file_type.append(_parts[0])
                    _baseline.append(_parts[1])
                    continue
            _file_type.append(ft)
            if isinstance(baseline, str):
                _baseline.append(baseline)
            elif isinstance(baseline, Sequence):
                try:
                    _baseline.append(baseline[i])
                except IndexError as e:
                    raise IndexError(e, f"given baseline list is too small")
            else:
                _baseline.append("latest")
        file_type = _file_type
        baseline = _baseline
    file_type = _to_file_info_list(file_type, FileType)
    baseline = _format_input(
        baseline,
        file_types=file_type,
        default_input="..",
        format_func=validate_baseline,
    )
    baseline_and_file_type_list = [f"{bl}_{ft}" for bl, ft in zip(baseline, file_type)]
    baseline_and_file_type = _list_to_regex(
        baseline_and_file_type_list, ".._..._..._.."
    )

    agency = _to_file_info_list(agency, FileAgency)
    agency = _list_to_regex(agency, ".")

    latency = _to_file_info_list(latency, FileLatency)
    latency = _list_to_regex(latency, ".")

    timestamp = _format_input(timestamp, format_func=to_timestamp)
    _start_time = [] if start_time is None else [to_timestamp(start_time)]
    _end_time = [] if end_time is None else [to_timestamp(end_time)]
    timestamp = timestamp + _start_time + _end_time

    orbit_and_frame = _format_input(orbit_and_frame, format_func=format_orbit_and_frame)
    orbit_and_frame = _list_to_regex(orbit_and_frame, "." * 6)

    orbit_number = _format_input(orbit_number, format_func=format_orbit_number)
    orbit_number = _list_to_regex(orbit_number, "." * 5)

    frame_id = _format_input(frame_id, format_func=format_frame_id)
    frame_id = _list_to_regex(frame_id, ".")

    oaf_list = []
    oaf = ""
    if orbit_number != "." * 5:
        oaf_list.append(orbit_number)
    if frame_id != ".":
        oaf_list.append(frame_id)
    if orbit_number != "." * 5 or frame_id != ".":
        oaf = f"{orbit_number}{frame_id}"

    if oaf == "":
        oaf = orbit_and_frame
    elif oaf != "" and orbit_and_frame != "." * 6:
        oaf = f"(({oaf})|{orbit_and_frame})"

    pattern = f".*{mission_id}_{agency}{latency}{baseline_and_file_type}_........T......Z_........T......Z_{oaf}.h5"

    files: list[str]
    if pattern == ".*ECA_...._..._..._.._........T......Z_........T......Z_.......h5":
        files = []
    elif mode == "fast" and len(file_type) > 0:
        files = []
        for ft in file_type:
            lvl = FileType.from_input(ft).get_level()
            _lvl_subdir = ""
            if lvl == "1B":
                _lvl_subdir = config.subdir_name_level1b
            elif lvl == "1C":
                _lvl_subdir = config.subdir_name_level1c
            elif lvl == "1D":
                _lvl_subdir = config.subdir_name_auxiliary_files
            elif lvl == "2A":
                _lvl_subdir = config.subdir_name_level2a
            elif lvl == "2B":
                _lvl_subdir = config.subdir_name_level2b
            else:
                raise ValueError(
                    f"file type '{ft}' not supported for search mode '{mode}'"
                )
            _root_dirpath = os.path.join(root_dirpath, _lvl_subdir, ft)

            if start_time is not None:
                _date_subdir = _get_date_subdir(start_time, end_time)
                if isinstance(_date_subdir, str):
                    _root_dirpath = os.path.join(
                        root_dirpath, _lvl_subdir, ft, _date_subdir
                    )

            if os.path.exists(_root_dirpath):
                print(f"Searching data at <{_root_dirpath}>")
                _files = search_files_by_regex(_root_dirpath, pattern)
            else:
                _files = []

            files.extend(_files)
    else:
        files = search_files_by_regex(root_dirpath, pattern)

    if isinstance(filename, str) or isinstance(filename, Sequence):
        if isinstance(filename, str):
            filename = [filename]
        _get_pattern = lambda fn: f".*{os.path.basename(fn).replace('.h5', '')}.*.h5"
        filename = [_get_pattern(fn) for fn in filename]
    elif filename is None:
        filename = []
    else:
        raise TypeError(
            f"Given filename has invalid type ({type(filename)}: {filename})"
        )

    for fn in filename:
        new_files = search_files_by_regex(root_dirpath, fn)
        files.extend(new_files)

    # Remove duplicates
    files = list(set(files))

    old_files = files.copy()
    if len(timestamp) > 0:
        files = []
        for t in timestamp:
            new_files = [
                f for f in old_files if _check_product_contains_timestamp(f, t)
            ]
            if len(new_files) > 0:
                files.extend(new_files)

    pdf = get_product_infos(files)

    if start_time is not None or end_time is not None:
        _pdf = get_product_infos(old_files)
        _pdf = filter_time_range(_pdf, start_time=start_time, end_time=end_time)

        if not pdf.empty and not _pdf.empty:
            pdf = ProductDataFrame(pd.concat([pdf, _pdf], ignore_index=True))
        elif not _pdf.empty:
            pdf = _pdf

    pdf = pdf.sort_values(by=["orbit_and_frame", "file_type", "start_processing_time"])
    pdf = pdf.drop_duplicates()
    pdf = pdf.reset_index(drop=True)

    pdf.validate_columns()
    return pdf

trim_to_latitude_frame_bounds

trim_to_latitude_frame_bounds(
    ds,
    along_track_dim=ALONG_TRACK_DIM,
    lat_var=TRACK_LAT_VAR,
    frame_id=None,
    add_trim_index_offset_var=True,
    trim_index_offset_var="trim_index_offset",
)

Trims the dataset to the region within the latitude frame bounds.

Parameters:

Name Type Description Default
ds Dataset

Input dataset to be trimmed.

required
along_track_dim str

Dimension along which to trim. Defaults to ALONG_TRACK_DIM.

ALONG_TRACK_DIM
lat_var str

Name of the latitude variable. Defaults to TRACK_LAT_VAR.

TRACK_LAT_VAR
frame_id str | None

EarthCARE frame ID (single character between "A" and "H"). If given, speeds up trimming. Defaults to None.

None
add_trim_index_offset_var bool

Whether the index offset between the original and trimmed dataset is stored in the trimmed dataset (variable: "trim_index_offset"). Defaults to True.

True

Returns:

Type Description
Dataset

xarray.Dataset: Trimmed dataset.

Source code in earthcarekit/utils/read/product/_trim_to_frame.py
def trim_to_latitude_frame_bounds(
    ds: Dataset,
    along_track_dim: str = ALONG_TRACK_DIM,
    lat_var: str = TRACK_LAT_VAR,
    frame_id: str | None = None,
    add_trim_index_offset_var: bool = True,
    trim_index_offset_var: str = "trim_index_offset",
) -> Dataset:
    """
    Trims the dataset to the region within the latitude frame bounds.

    Args:
        ds (xarray.Dataset):
            Input dataset to be trimmed.
        along_track_dim (str, optional):
            Dimension along which to trim. Defaults to ALONG_TRACK_DIM.
        lat_var (str, optional):
            Name of the latitude variable. Defaults to TRACK_LAT_VAR.
        frame_id (str | None, optional):
            EarthCARE frame ID (single character between "A" and "H").
            If given, speeds up trimming. Defaults to None.
        add_trim_index_offset_var (bool, optional):
            Whether the index offset between the original and trimmed dataset is stored
            in the trimmed dataset (variable: "trim_index_offset"). Defaults to True.

    Returns:
        xarray.Dataset: Trimmed dataset.
    """
    slice_tuple = get_frame_along_track(
        ds,
        lat_var=lat_var,
        frame_id=frame_id,
    )
    ds = ds.isel({along_track_dim: slice(*slice_tuple)})
    if add_trim_index_offset_var and slice_tuple[0] > 0:
        ds = insert_var(
            ds=ds,
            var=trim_index_offset_var,
            data=int(slice_tuple[0]),
            index=0,
            after_var="processing_start_time",
        )
        ds[trim_index_offset_var] = ds[trim_index_offset_var].assign_attrs(
            {
                "earthcarekit": "Added by earthcarekit: Used to calculate the index in the original, untrimmed dataset, i.e. by addition."
            }
        )
    return ds