Skip to content

API reference

earthcarekit.utils.xarray_utils

earthcarekit.utils.xarray_utils

Utilities based on xarray.


concat_datasets

concat_datasets(ds1: Dataset, ds2: Dataset, dim: str) -> Dataset

Concatenate two xarray.Dataset objects along a specified dimension, padding other dimensions to match.

Pads all non-concatenation dimensions in both datasets to the maximum size among them (if they differ) before concatenating. Integer variables are padded with -9999 or data type-specific minimum value (e.g., -128 for int8), non-interger variables are padded with NaN.

Parameters:

Name Type Description Default
ds1 Dataset

The first dataset to concatenate.

required
ds2 Dataset

The second dataset to concatenate.

required
dim str

The name of the dimension to concatenate along.

required

Returns:

Name Type Description
Dataset Dataset

A new dataset resulting from the concatenation.

Source code in earthcarekit/utils/xarray_utils/_concat.py
def concat_datasets(ds1: Dataset, ds2: Dataset, dim: str) -> Dataset:
    """Concatenate two `xarray.Dataset` objects along a specified dimension, padding other dimensions to match.

    Pads all non-concatenation dimensions in both datasets to the maximum size among them
    (if they differ) before concatenating. Integer variables are padded with -9999 or data
    type-specific minimum value (e.g., -128 for int8), non-interger variables are padded with NaN.

    Args:
        ds1 (Dataset): The first dataset to concatenate.
        ds2 (Dataset): The second dataset to concatenate.
        dim (str): The name of the dimension to concatenate along.

    Returns:
        Dataset: A new dataset resulting from the concatenation.
    """

    def get_scalars(ds: xr.Dataset) -> list:
        scalars = [k for k, v in ds.data_vars.items() if v.ndim == 0]
        return scalars

    ds1_scalars = get_scalars(ds1)
    ds2_scalars = get_scalars(ds2)
    scalar_vars: list = list(set(ds1_scalars + ds2_scalars))

    scalar_data: dict = {v: [] for v in scalar_vars}
    for v in scalar_vars:
        if v in ds1:
            scalar_data[v].extend(np.atleast_1d(ds1[v].values))
        if v in ds2:
            scalar_data[v].extend(np.atleast_1d(ds2[v].values))

    max_dim_sizes = {
        d: max(ds1.sizes.get(d, 0), ds2.sizes.get(d, 0))
        for d in set(ds1.dims).union(ds2.dims)
        if d != dim
    }

    ds1_padded = pad_dataset(ds1, max_dim_sizes)
    ds2_padded = pad_dataset(ds2, max_dim_sizes)

    ds_combined = xr.concat([ds1_padded, ds2_padded], dim=dim, data_vars="all")

    if "concat_dim" in ds_combined.dims:
        ds_combined = ds_combined.drop_dims("concat_dim", errors="ignore")

    for v in scalar_vars:
        da = xr.DataArray(scalar_data[v], dims=["concat_dim"])
        ds_combined[v] = da

    source1 = ds1.encoding.get("source")
    source2 = ds2.encoding.get("source")
    sources = [s for s in [source1, source2] if isinstance(s, str)]

    if len(sources) > 0:
        ds_combined.encoding["sources"] = sources

    return ds_combined

convert_scalar_var_to_str

convert_scalar_var_to_str(ds: Dataset, var: str) -> Dataset

Converts a given scalar variable inside a xarray.Dataset to string.

Source code in earthcarekit/utils/xarray_utils/_scalars.py
def convert_scalar_var_to_str(ds: xr.Dataset, var: str) -> xr.Dataset:
    """Converts a given scalar variable inside a `xarray.Dataset` to string."""
    val = ds[var].item()
    if isinstance(val, bytes):
        val = val.decode("utf-8")
    else:
        val = str(val)
    ds[var] = xr.DataArray(val)
    return ds

filter_index

filter_index(
    ds: Dataset,
    index: int | slice | NDArray | Sequence,
    along_track_dim: str = ALONG_TRACK_DIM,
    trim_index_offset_var: str = "trim_index_offset",
    pad_idxs: int = 0,
) -> Dataset

Filters a dataset given an along-track index number, list/array or range/slice.

Parameters:

Name Type Description Default
ds Dataset

Input dataset with along-track dimension.

required
index int | slice | NDArray

Index(es) to filter.

required
along_track_dim str

Dimension along which to apply filtering. Defaults to ALONG_TRACK_DIM.

ALONG_TRACK_DIM
pad_idxs int

Number of additional samples added at both sides of the selection. This input is ignored when index is an array-like. Defaults to 0.

0

Returns:

Name Type Description
Dataset Dataset

Filtered dataset.

Examples:

>>> fp = "ECA_EXBC_CPR_FMR_2A_20260108T030403Z_20260108T042349Z_09167F.h5"
>>> with eck.read_product(fp) as ds:
>>>     ds_filtered = eck.filter_index(ds, 123)
>>>     print(ds_filtered.sizes)
Frozen({'along_track': 1, 'vertical': 218})
>>>         ds_filtered = eck.filter_index(ds, slice(0, 1000))
>>>         print(ds_filtered.sizes)
Frozen({'along_track': 1000, 'vertical': 218})
>>>         ds_filtered = eck.filter_index(ds, (0, 1000))
>>>         print(ds_filtered.sizes)
Frozen({'along_track': 2, 'vertical': 218})
Source code in earthcarekit/utils/xarray_utils/_filter_index.py
def filter_index(
    ds: Dataset,
    index: int | slice | NDArray | Sequence,
    along_track_dim: str = ALONG_TRACK_DIM,
    trim_index_offset_var: str = "trim_index_offset",
    pad_idxs: int = 0,
) -> Dataset:
    """
    Filters a dataset given an along-track index number, list/array or range/slice.

    Args:
        ds (Dataset): Input dataset with along-track dimension.
        index (int | slice | NDArray): Index(es) to filter.
        along_track_dim (str, optional): Dimension along which to apply filtering. Defaults to ALONG_TRACK_DIM.
        pad_idxs (int, optional): Number of additional samples added at both sides of the selection.
            This input is ignored when `index` is an array-like. Defaults to 0.

    Returns:
        Dataset: Filtered dataset.

    Examples:
        ```python
        >>> fp = "ECA_EXBC_CPR_FMR_2A_20260108T030403Z_20260108T042349Z_09167F.h5"
        >>> with eck.read_product(fp) as ds:
        >>>     ds_filtered = eck.filter_index(ds, 123)
        >>>     print(ds_filtered.sizes)
        Frozen({'along_track': 1, 'vertical': 218})
        >>>         ds_filtered = eck.filter_index(ds, slice(0, 1000))
        >>>         print(ds_filtered.sizes)
        Frozen({'along_track': 1000, 'vertical': 218})
        >>>         ds_filtered = eck.filter_index(ds, (0, 1000))
        >>>         print(ds_filtered.sizes)
        Frozen({'along_track': 2, 'vertical': 218})
        ```
    """
    if isinstance(index, np.ndarray) and len(index.shape) == 0:
        index = int(index)
    elif isinstance(index, (Sequence, np.ndarray)):
        if len(index) == 0:
            raise ValueError("index must be integer or non-empty array")
        elif len(index) == 1:
            index = int(index[0])

    if isinstance(index, int):
        index = slice(index, index + 1)

    if isinstance(index, slice):
        index = slice(index.start - pad_idxs, index.stop + pad_idxs, index.step)
    else:
        index = flatten_array(index)

    ds_new = ds.copy().isel({along_track_dim: index})
    new_trim_index_offset: int | NDArray = 0

    if isinstance(index, slice):
        if isinstance(index.step, int) and index.step > 1:
            new_trim_index_offset = np.array(list(range(index.start, index.stop, index.step)))
        else:
            new_trim_index_offset = int(index.start)

    if isinstance(index, np.ndarray):
        if np.max(np.diff(index)) > 1:
            new_trim_index_offset = index
        else:
            new_trim_index_offset = int(index[0])

    if trim_index_offset_var in ds_new:
        old_trim_index_offset = ds_new[trim_index_offset_var].values
        trim_index_offset = np.asarray(old_trim_index_offset + new_trim_index_offset)

        if len(trim_index_offset.shape) == 0:
            ds_new[trim_index_offset_var] = trim_index_offset
        else:
            ds_new[trim_index_offset_var] = ("new_dim", trim_index_offset)
    else:
        ds_new = insert_var(
            ds=ds_new,
            var=trim_index_offset_var,
            data=new_trim_index_offset,
            index=0,
            after_var="processing_start_time",
        )
        ds_new[trim_index_offset_var] = ds_new[trim_index_offset_var].assign_attrs(
            {
                "earthcarekit": "Added by earthcarekit: Used to calculate the index in the original, untrimmed dataset, i.e. by addition."
            }
        )

    return ds_new

filter_latitude

filter_latitude(
    ds: Dataset,
    lat_range: NumericPairNoneLike,
    start_before_pole: bool = True,
    end_before_pole: bool = True,
    only_center: bool = False,
    lat_var: str = TRACK_LAT_VAR,
    along_track_dim: str = ALONG_TRACK_DIM,
    trim_index_offset_var: str = "trim_index_offset",
    pad_idxs: int = 0,
    shift_idxs: int = 0,
) -> Dataset

Filters a dataset to include only points within a specified latitude range.

Parameters:

Name Type Description Default
ds Dataset

Input dataset with geolocation data.

required
lat_range NumericPairNoneLike

A pair of latitude values (min_lat, max_lat) defining the selection range.

required
start_before_pole bool

If True, selection starts before the pole when the track crosses one. Defaults to True.

True
end_before_pole bool

If True, selection ends before the pole when the track crosses one. Defaults to True.

True
only_center bool

If True, only the sample at the center index of selection is returned. Defaults to False.

False
lat_var str

Name of the latitude variable. Defaults to TRACK_LAT_VAR.

TRACK_LAT_VAR
along_track_dim str

Dimension along which to apply filtering. Defaults to ALONG_TRACK_DIM.

ALONG_TRACK_DIM
pad_idxs int

Number of additional samples added at both sides of the selection. Defaults to 0.

0
shift_idxs int

Offset number to shift selection of samples. Defaults to 0.

0

Raises:

Type Description
ValueError

If selection is empty.

Returns:

Type Description
Dataset

xr.Dataset: Filtered dataset containing only points within the specified latitude range.

Examples:

>>> fp = "ECA_EXBC_CPR_FMR_2A_20260108T030403Z_20260108T042349Z_09167F.h5"
>>> with eck.read_product(fp) as ds:
>>>     print(ds.latitude.values)
[-22.50316844 -22.51202978 -22.52089178 ... -67.48243216 -67.49074691 -67.49906148]
>>>     ds_filtered = eck.filter_latitude(ds, (-40, -30))
>>>     print(ds_filtered.latitude.values)
[-30.0036885  -30.01258957 -30.02149091 ... -39.98112826 -39.98962597 -39.99812425]
Source code in earthcarekit/utils/xarray_utils/_filter_latitude.py
def filter_latitude(
    ds: xr.Dataset,
    lat_range: NumericPairNoneLike,
    start_before_pole: bool = True,
    end_before_pole: bool = True,
    only_center: bool = False,
    lat_var: str = TRACK_LAT_VAR,
    along_track_dim: str = ALONG_TRACK_DIM,
    trim_index_offset_var: str = "trim_index_offset",
    pad_idxs: int = 0,
    shift_idxs: int = 0,
) -> xr.Dataset:
    """
    Filters a dataset to include only points within a specified latitude range.

    Args:
        ds (xr.Dataset): Input dataset with geolocation data.
        lat_range (NumericPairNoneLike): A pair of latitude values (min_lat, max_lat) defining the selection range.
        start_before_pole (bool, optional): If True, selection starts before the pole when the track crosses one. Defaults to True.
        end_before_pole (bool, optional): If True, selection ends before the pole when the track crosses one. Defaults to True.
        only_center (bool, optional): If True, only the sample at the center index of selection is returned. Defaults to False.
        lat_var (str, optional): Name of the latitude variable. Defaults to TRACK_LAT_VAR.
        along_track_dim (str, optional): Dimension along which to apply filtering. Defaults to ALONG_TRACK_DIM.
        pad_idxs (int, optional): Number of additional samples added at both sides of the selection. Defaults to 0.
        shift_idxs (int, optional): Offset number to shift selection of samples. Defaults to 0.

    Raises:
        ValueError: If selection is empty.

    Returns:
        xr.Dataset: Filtered dataset containing only points within the specified latitude range.

    Examples:
        ```python
        >>> fp = "ECA_EXBC_CPR_FMR_2A_20260108T030403Z_20260108T042349Z_09167F.h5"
        >>> with eck.read_product(fp) as ds:
        >>>     print(ds.latitude.values)
        [-22.50316844 -22.51202978 -22.52089178 ... -67.48243216 -67.49074691 -67.49906148]
        >>>     ds_filtered = eck.filter_latitude(ds, (-40, -30))
        >>>     print(ds_filtered.latitude.values)
        [-30.0036885  -30.01258957 -30.02149091 ... -39.98112826 -39.98962597 -39.99812425]
        ```
    """
    lats = ds[lat_var].values

    satellite_crosses_pole, is_first_increase, mask_before_pole, mask_after_pole = (
        _get_pole_crossing_masks(ds, lat_var=lat_var)
    )

    lat_range = validate_numeric_pair(lat_range, fallback=(lats[0], lats[-1]))

    lats_mask: NDArray[np.bool_] = (lats >= np.min(lat_range)) & (lats <= np.max(lat_range))

    if satellite_crosses_pole and start_before_pole and not end_before_pole:
        if is_first_increase:
            mask_from_start = lats >= lat_range[0]
            mask_from_end = lats >= lat_range[1]
        else:
            mask_from_start = lats <= lat_range[0]
            mask_from_end = lats <= lat_range[1]

        mask_from_start_before_pole = np.logical_and(mask_before_pole, mask_from_start)
        mask_from_end_after_pole = np.logical_and(mask_after_pole, mask_from_end)

        mask = np.logical_or(mask_from_start_before_pole, mask_from_end_after_pole)
    elif satellite_crosses_pole and start_before_pole and end_before_pole:
        mask = np.logical_and(lats_mask, mask_before_pole)
    elif satellite_crosses_pole and not start_before_pole:
        mask = np.logical_and(lats_mask, mask_after_pole)
    else:
        mask = lats_mask

    if only_center:
        mask_true_idxs = np.where(mask)[0]
        if len(mask_true_idxs) > 0:
            idx_center = mask_true_idxs[len(mask_true_idxs) // 2]
            mask[:] = False
            mask[idx_center] = True

    mask = pad_true_sequence(mask, pad_idxs)
    mask = shift_true_sequence(mask, shift_idxs)

    if np.sum(mask) == 0:
        msg = f"No data falls into the given latitude range!\nIn the dataset latitude falls between {np.min(lats)} and {np.max(lats)}.\n"
        if satellite_crosses_pole:
            msg += "Note that the satellite crosses a pole (set `start_before_pole` and `end_before_pole`\nto clarify how the start and end of the range should be interpreted)."
        else:
            msg += "The satellite is not crossing a pole."
        raise ValueError(msg)

    da_mask: xr.DataArray = xr.DataArray(mask, dims=[along_track_dim], name=lat_var)

    ds_new: xr.Dataset = xr.Dataset(
        {
            var: (
                ds[var].copy().where(da_mask, drop=True)
                if along_track_dim in ds[var].dims
                else ds[var].copy()
            )
            for var in ds.data_vars
        }
    )
    ds_new.attrs = ds.attrs.copy()
    ds_new.encoding = ds.encoding.copy()

    new_trim_index_offset: int = int(np.argmax(mask))
    if trim_index_offset_var in ds_new:
        old_trim_index_offset = int(ds_new[trim_index_offset_var].values)
        trim_index_offset = old_trim_index_offset + new_trim_index_offset
        ds_new[trim_index_offset_var].values = np.asarray(trim_index_offset)
    else:
        ds_new = insert_var(
            ds=ds_new,
            var=trim_index_offset_var,
            data=new_trim_index_offset,
            index=0,
            after_var="processing_start_time",
        )
        ds_new[trim_index_offset_var] = ds_new[trim_index_offset_var].assign_attrs(
            {
                "earthcarekit": "Added by earthcarekit: Used to calculate the index in the original, untrimmed dataset, i.e. by addition."
            }
        )

    return ds_new

filter_radius

filter_radius(
    ds: Dataset,
    radius_km: float = 100.0,
    center_lat: float | None = None,
    center_lon: float | None = None,
    site: GroundSite | str | None = None,
    lat_var: str = TRACK_LAT_VAR,
    lon_var: str = TRACK_LON_VAR,
    along_track_dim: str = ALONG_TRACK_DIM,
    method: Literal["geodesic", "haversine"] = "geodesic",
    closest: bool = False,
    trim_index_offset_var: str = "trim_index_offset",
    pad_idxs: int = 0,
    shift_idxs: int = 0,
) -> Dataset

Filters a dataset to include only points within a specified radius of a geographic location.

Parameters:

Name Type Description Default
ds Dataset

Input dataset with geolocation data.

required
radius_km float

Radius (in kilometers) around the center location.

100.0
site GroundSite or str

GroundSite object or name from which center location will be retrieved, alternatively center_lat and center_lon must be set.

None
center_lat float

Latitude of the center point, alternatively site must be set.

None
center_lon float

Longitude of the center point, alternatively site must be set.

None
lat_var str

Name of the latitude variable. Defaults to TRACK_LAT_VAR.

TRACK_LAT_VAR
lon_var str

Name of the longitude variable. Defaults to TRACK_LON_VAR.

TRACK_LON_VAR
along_track_dim str

Dimension along which to apply filtering. Defaults to ALONG_TRACK_DIM.

ALONG_TRACK_DIM
method Literal['geodesic', 'haversine']

Distance calculation method. Defaults to "geodesic".

'geodesic'
closest bool

If True, only the single closest sample is returned, otherwise all samples within radius. Defaults to False.

False
trim_index_offset_var str

dataset variable keeping track of index offsets caused by dataset trimming/filtering. Defaults to "trim_index_offset".

'trim_index_offset'
pad_idxs int

Number of additional samples added at both sides of the selection. Defaults to 0.

0
shift_idxs int

Offset number to shift selection of samples. Defaults to 0.

0

Returns:

Type Description
Dataset

xr.Dataset: Filtered dataset containing only points within the specified radius.

Raises:

Type Description
EmptyFilterResultError

If no data points are found within the radius.

ValueError

If the method is invalid.

Examples:

>>> fp = "ECA_EXBB_ATL_EBD_2A_20240902T210023Z_20251107T142547Z_01508B.h5"
>>> with eck.read_product(fp) as ds:
>>>     print(ds.sizes)
Frozen({'along_track': 5143, 'vertical': 242, 'layer': 25, 'n_state': 351})
>>>     ds_filtered = eck.filter_radius(ds, site="dushanbe")
>>>     print(ds_filtered.sizes)
Frozen({'along_track': 197, 'vertical': 242, 'layer': 25, 'n_state': 351})
>>>     ds_filtered = eck.filter_radius(ds, site="dushanbe", radius_km=200)
>>>     print(ds_filtered.sizes)
Frozen({'along_track': 399, 'vertical': 242, 'layer': 25, 'n_state': 351})
Source code in earthcarekit/utils/xarray_utils/_filter_radius.py
def filter_radius(
    ds: xr.Dataset,
    # *,
    radius_km: float = 100.0,
    center_lat: float | None = None,
    center_lon: float | None = None,
    site: GroundSite | str | None = None,
    lat_var: str = TRACK_LAT_VAR,
    lon_var: str = TRACK_LON_VAR,
    along_track_dim: str = ALONG_TRACK_DIM,
    method: Literal["geodesic", "haversine"] = "geodesic",
    closest: bool = False,
    trim_index_offset_var: str = "trim_index_offset",
    pad_idxs: int = 0,
    shift_idxs: int = 0,
) -> xr.Dataset:
    """
    Filters a dataset to include only points within a specified radius of a geographic location.

    Args:
        ds (xr.Dataset): Input dataset with geolocation data.
        radius_km (float): Radius (in kilometers) around the center location.
        site (GroundSite or str, optional): GroundSite object or name from which center location will be retrieved,
            alternatively `center_lat` and `center_lon` must be set.
        center_lat (float, optional): Latitude of the center point,
            alternatively `site` must be set.
        center_lon (float, optional): Longitude of the center point,
            alternatively `site` must be set.
        lat_var (str, optional): Name of the latitude variable. Defaults to TRACK_LAT_VAR.
        lon_var (str, optional): Name of the longitude variable. Defaults to TRACK_LON_VAR.
        along_track_dim (str, optional): Dimension along which to apply filtering. Defaults to ALONG_TRACK_DIM.
        method (Literal["geodesic", "haversine"], optional): Distance calculation method. Defaults to "geodesic".
        closest (bool, optional): If True, only the single closest sample is returned, otherwise all samples within radius. Defaults to False.
        trim_index_offset_var (str, optional): dataset variable keeping track of index offsets caused by dataset trimming/filtering. Defaults to "trim_index_offset".
        pad_idxs (int, optional): Number of additional samples added at both sides of the selection. Defaults to 0.
        shift_idxs (int, optional): Offset number to shift selection of samples. Defaults to 0.

    Returns:
        xr.Dataset: Filtered dataset containing only points within the specified radius.

    Raises:
        EmptyFilterResultError: If no data points are found within the radius.
        ValueError: If the `method` is invalid.

    Examples:
        ```python
        >>> fp = "ECA_EXBB_ATL_EBD_2A_20240902T210023Z_20251107T142547Z_01508B.h5"
        >>> with eck.read_product(fp) as ds:
        >>>     print(ds.sizes)
        Frozen({'along_track': 5143, 'vertical': 242, 'layer': 25, 'n_state': 351})
        >>>     ds_filtered = eck.filter_radius(ds, site="dushanbe")
        >>>     print(ds_filtered.sizes)
        Frozen({'along_track': 197, 'vertical': 242, 'layer': 25, 'n_state': 351})
        >>>     ds_filtered = eck.filter_radius(ds, site="dushanbe", radius_km=200)
        >>>     print(ds_filtered.sizes)
        Frozen({'along_track': 399, 'vertical': 242, 'layer': 25, 'n_state': 351})
        ```
    """
    _center_lat: float
    _center_lon: float

    if isinstance(site, str):
        site = get_ground_site(site)

    if isinstance(site, GroundSite):
        _center_lat = site.latitude
        _center_lon = site.longitude
    elif isinstance(center_lat, (int, float, np.integer, np.floating)) and isinstance(
        center_lon, (int, float, np.integer, np.floating)
    ):
        _center_lat = float(center_lat)
        _center_lon = float(center_lon)
    else:
        raise ValueError("Either 'site' or 'center_lat' and 'center_lon' must be given.")

    if method not in ["geodesic", "haversine"]:
        raise ValueError(r'Invalid method choosen. Available methods: {"geodesic", "haversine"}')

    satellite_coords = get_coords(ds, lat_var=lat_var, lon_var=lon_var)

    center_coords = (_center_lat, _center_lon)

    if method == "geodesic":
        distances = geodesic(center_coords, satellite_coords)
    else:
        distances = haversine(center_coords, satellite_coords)

    mask = np.array(distances < radius_km)

    if closest:
        closest_distance = np.min(distances)
        closest_filtered_index = int(np.argmin(np.abs(distances - closest_distance)))
        mask[:] = False
        mask[closest_filtered_index] = True

    mask = pad_true_sequence(mask, pad_idxs)
    mask = shift_true_sequence(mask, shift_idxs)

    da_mask = xr.DataArray(data=mask, dims=[along_track_dim])
    if np.sum(da_mask.values) < 1:
        raise EmptyFilterResultError(
            f"Could not find valid overpass for given inputs. Data lies outside the given {radius_km} km radius around ({center_lat} degN {center_lon} degE).",
            min_distance=float(np.min(distances)),
        )

    ds_new: xr.Dataset = xr.Dataset(
        {
            var: (
                ds[var].copy().where(da_mask, drop=True)
                if along_track_dim in ds[var].dims
                else ds[var].copy()
            )
            for var in ds.data_vars
        }
    )
    ds_new.attrs = ds.attrs.copy()
    ds_new.encoding = ds.encoding.copy()

    new_trim_index_offset: int = int(np.argmax(mask))
    if trim_index_offset_var in ds_new:
        old_trim_index_offset = int(ds_new[trim_index_offset_var].values)
        trim_index_offset = old_trim_index_offset + new_trim_index_offset
        ds_new[trim_index_offset_var].values = np.asarray(trim_index_offset)
    else:
        ds_new = insert_var(
            ds=ds_new,
            var=trim_index_offset_var,
            data=new_trim_index_offset,
            index=0,
            after_var="processing_start_time",
        )
        ds_new[trim_index_offset_var] = ds_new[trim_index_offset_var].assign_attrs(
            {
                "earthcarekit": "Added by earthcarekit: Used to calculate the index in the original, untrimmed dataset, i.e. by addition."
            }
        )
    return ds_new

filter_time

filter_time(
    ds: Dataset,
    time_range: TimeRangeLike | Iterable | None = None,
    timestamp: TimestampLike | None = None,
    only_center: bool = False,
    time_var: str = TIME_VAR,
    along_track_dim: str = ALONG_TRACK_DIM,
    trim_index_offset_var: str = "trim_index_offset",
    pad_idxs: int = 0,
    shift_idxs: int = 0,
) -> Dataset

Filters an xarray Dataset to include only samples within a given time range.

Parameters:

Name Type Description Default
ds Dataset

The input dataset containing a time coordinate.

required
time_range TimeRangeLike | Iterable | None

Start and end time of the range to filter, as strings or pandas timestamps. Defaults to None.

None
timestamp TimestampLike | None

A single timestamp for which the closest sample to return. Defaults to None.

None
only_center bool

If True, only the sample at the center index of selection is returned. Defaults to False.

False
time_var str

Name of the time variable in ds. Defaults to TIME_VAR.

TIME_VAR
along_track_dim str

Dimension name along which time is defined. Defaults to ALONG_TRACK_DIM.

ALONG_TRACK_DIM
pad_idxs int

Number of additional samples added at both sides of the selection. Defaults to 0.

0
shift_idxs int

Offset number to shift selection of samples. Defaults to 0.

0

Returns:

Type Description
Dataset

xr.Dataset: Subset of ds containing only samples within the specified time range.

Examples:

>>> fp = "ECA_EXBC_CPR_FMR_2A_20260108T030403Z_20260108T042349Z_09167F.h5"
>>> with eck.read_product(fp) as ds:
>>>     print(ds.time.values[[0, -1]])
['2026-01-08T03:04:08.393852288' '2026-01-08T03:15:57.401298304']
>>>     ds_filtered = eck.filter_time(ds, time_range=("2026-01-08 03:10", "2026-01-08 03:12"))
>>>     print(ds_filtered.time.values[[0, -1]])
['2026-01-08T03:10:00.115605248' '2026-01-08T03:11:59.985651712']
Source code in earthcarekit/utils/xarray_utils/_filter_time.py
def filter_time(
    ds: xr.Dataset,
    time_range: TimeRangeLike | Iterable | None = None,
    timestamp: TimestampLike | None = None,
    only_center: bool = False,
    time_var: str = TIME_VAR,
    along_track_dim: str = ALONG_TRACK_DIM,
    trim_index_offset_var: str = "trim_index_offset",
    pad_idxs: int = 0,
    shift_idxs: int = 0,
) -> xr.Dataset:
    """
    Filters an xarray Dataset to include only samples within a given time range.

    Args:
        ds (xr.Dataset): The input dataset containing a time coordinate.
        time_range (TimeRangeLike | Iterable | None):
            Start and end time of the range to filter, as strings or pandas timestamps. Defaults to None.
        timestamp (TimestampLike | None): A single timestamp for which the closest sample to return. Defaults to None.
        only_center (bool, optional): If True, only the sample at the center index of selection is returned. Defaults to False.
        time_var (str, optional): Name of the time variable in `ds`. Defaults to TIME_VAR.
        along_track_dim (str, optional): Dimension name along which time is defined. Defaults to ALONG_TRACK_DIM.
        pad_idxs (int, optional): Number of additional samples added at both sides of the selection. Defaults to 0.
        shift_idxs (int, optional): Offset number to shift selection of samples. Defaults to 0.

    Returns:
        xr.Dataset: Subset of `ds` containing only samples within the specified time range.

    Examples:
        ```python
        >>> fp = "ECA_EXBC_CPR_FMR_2A_20260108T030403Z_20260108T042349Z_09167F.h5"
        >>> with eck.read_product(fp) as ds:
        >>>     print(ds.time.values[[0, -1]])
        ['2026-01-08T03:04:08.393852288' '2026-01-08T03:15:57.401298304']
        >>>     ds_filtered = eck.filter_time(ds, time_range=("2026-01-08 03:10", "2026-01-08 03:12"))
        >>>     print(ds_filtered.time.values[[0, -1]])
        ['2026-01-08T03:10:00.115605248' '2026-01-08T03:11:59.985651712']
        ```
    """
    if time_range is not None and timestamp is not None:
        raise ValueError("Can not use both arguments time_range and timestamp at the same time.")

    mask = get_filter_time_mask(
        ds=ds,
        time_range=time_range,
        timestamp=timestamp,
        only_center=only_center,
        time_var=time_var,
        pad_idxs=pad_idxs,
        shift_idxs=shift_idxs,
    )

    if np.sum(mask) == 0:
        times = ds[time_var].values
        msg = (
            f"No data falls into the given time range!\n"
            f"In the dataset time ranges from {times[0]} to {times[-1]}.\n"
        )
        raise ValueError(msg)

    da_mask: xr.DataArray = xr.DataArray(mask, dims=[along_track_dim], name=time_var)

    ds_new: xr.Dataset = xr.Dataset(
        {
            var: (
                ds[var].copy().where(da_mask, drop=True)
                if along_track_dim in ds[var].dims
                else ds[var].copy()
            )
            for var in ds.data_vars
        }
    )
    ds_new.attrs = ds.attrs.copy()
    ds_new.encoding = ds.encoding.copy()

    new_trim_index_offset: int = int(np.argmax(mask))
    if trim_index_offset_var in ds_new:
        if len(ds_new[trim_index_offset_var].values.shape) != 0:
            ds_new[trim_index_offset_var] = (
                [],
                ds_new[trim_index_offset_var].values[0],
            )
        old_trim_index_offset = int(ds_new[trim_index_offset_var].values)
        trim_index_offset = old_trim_index_offset + new_trim_index_offset
        ds_new[trim_index_offset_var].values = np.asarray(trim_index_offset)
    else:
        ds_new = insert_var(
            ds=ds_new,
            var=trim_index_offset_var,
            data=new_trim_index_offset,
            index=0,
            after_var="processing_start_time",
        )
        ds_new[trim_index_offset_var] = ds_new[trim_index_offset_var].assign_attrs(
            {
                "earthcarekit": "Added by earthcarekit: Used to calculate the index in the original, untrimmed dataset, i.e. by addition."
            }
        )

    return ds_new

insert_var

insert_var(
    ds: Dataset,
    var: str,
    data: Any,
    index: int | None = None,
    before_var: str | None = None,
    after_var: str | None = None,
) -> Dataset

Inserts a new variable in a xarray.Dataset before or after a given variable or at a given index.

Parameters:

Name Type Description Default
ds Dataset

The original dataset to which the variable will be added.

required
var str

Name of the new variable to be added.

required
data Any

Data stored in the new variable.

required
index int | None

Index at which the new variable will be added. Will be ignored when either before_var or after_var are given and valid. Defaults to None.

None
before_var str | None

Name of the variable before which the new variable should be inserted. Defaults to None.

None
after_var str | None

Name of the variable after which the new variable should be inserted. Will be ignored when before_var is given and valid. Defaults to None.

None

Returns:

Name Type Description
Dataset Dataset

The original dataset with the new variable inserted.

Source code in earthcarekit/utils/xarray_utils/_insert_var.py
def insert_var(
    ds: Dataset,
    var: str,
    data: Any,
    index: int | None = None,
    before_var: str | None = None,
    after_var: str | None = None,
) -> Dataset:
    """
    Inserts a new variable in a `xarray.Dataset` before or after a given variable or at a given index.

    Args:
        ds (Dataset):
            The original dataset to which the variable will be added.
        var (str):
            Name of the new variable to be added.
        data (Any):
            Data stored in the new variable.
        index (int | None, optional):
            Index at which the new variable will be added. Will be ignored when either `before_var` or
            `after_var` are given and valid. Defaults to None.
        before_var (str | None, optional):
            Name of the variable before which the new variable should be inserted. Defaults to None.
        after_var (str | None, optional):
            Name of the variable after which the new variable should be inserted. Will be ignored
            when `before_var` is given and valid. Defaults to None.

    Returns:
        Dataset: The original dataset with the new variable inserted.
    """
    if var in ds.data_vars:
        ds = ds.drop_vars(var)

    if isinstance(index, int) or isinstance(before_var, str) or isinstance(after_var, str):
        vars = list(ds.data_vars)

        if isinstance(before_var, str) and before_var in vars:
            index = vars.index(before_var)
        elif isinstance(after_var, str) and after_var in vars:
            index = vars.index(after_var) + 1
        elif not isinstance(index, int):
            index = len(vars)

        vars.insert(index, var)

        ds[var] = data
        ds = ds[vars]
    else:
        ds[var] = data

    return ds

merge_datasets

merge_datasets(ds1: Dataset, ds2: Dataset, keep_sec: bool = False) -> Dataset

Merges two datasets while keeping all global attributes from one dataset.

Source code in earthcarekit/utils/xarray_utils/_merge.py
def merge_datasets(
    ds1: xr.Dataset,
    ds2: xr.Dataset,
    keep_sec: bool = False,
) -> xr.Dataset:
    """Merges two datasets while keeping all global attributes from one dataset."""
    ds_merged = xr.merge([ds1, ds2])
    if keep_sec:
        ds_merged.attrs = ds2.attrs.copy()
        ds_merged.encoding = ds2.encoding.copy()
    else:
        ds_merged.attrs = ds1.attrs.copy()
        ds_merged.encoding = ds1.encoding.copy()
    return ds_merged

remove_dims

remove_dims(ds: Dataset, dims_to_remove: list[str]) -> Dataset

Drop a list of dimensions and all associated variables and coordinates from a given xarray.dataset.

Source code in earthcarekit/utils/xarray_utils/_delete.py
def remove_dims(ds: xr.Dataset, dims_to_remove: list[str]) -> xr.Dataset:
    """Drop a list of dimensions and all associated variables and coordinates from a given `xarray.dataset`."""
    vars_to_drop = [
        var for var in ds.variables if any(dim in ds[var].dims for dim in dims_to_remove)
    ]
    coords_to_drop = [
        coord for coord in ds.coords if any(dim in ds[coord].dims for dim in dims_to_remove)
    ]

    ds_dropped = ds.drop_vars(vars_to_drop + coords_to_drop, errors="ignore")

    for dim in dims_to_remove:
        if dim in ds_dropped.dims:
            ds_dropped = ds_dropped.drop_dims(dim)

    return ds_dropped