Skip to content

API reference

earthcarekit.filter

Dataset filtering utilities.

Notes

This module depends on other internal modules:


filter_frame

filter_frame(
    ds: Dataset,
    along_track_dim: str = ALONG_TRACK_DIM,
    lat_var: str = TRACK_LAT_VAR,
    frame_id: str | None = None,
    add_trim_index_offset_var: bool = True,
    trim_index_offset_var: str = "trim_index_offset",
) -> Dataset

Trims the dataset to the region within the latitude frame bounds.

Parameters:

Name Type Description Default
ds Dataset

Input dataset to be trimmed.

required
along_track_dim str

Dimension along which to trim. Defaults to ALONG_TRACK_DIM.

ALONG_TRACK_DIM
lat_var str

Name of the latitude variable. Defaults to TRACK_LAT_VAR.

TRACK_LAT_VAR
frame_id str | None

EarthCARE frame ID (single character between "A" and "H"). If given, speeds up trimming. Defaults to None.

None
add_trim_index_offset_var bool

Whether the index offset between the original and trimmed dataset is stored in the trimmed dataset (variable: "trim_index_offset"). Defaults to True.

True

Returns:

Type Description
Dataset

xarray.Dataset: Trimmed dataset.

Source code in earthcarekit/filter/_frame.py
def filter_frame(
    ds: Dataset,
    along_track_dim: str = ALONG_TRACK_DIM,
    lat_var: str = TRACK_LAT_VAR,
    frame_id: str | None = None,
    add_trim_index_offset_var: bool = True,
    trim_index_offset_var: str = "trim_index_offset",
) -> Dataset:
    """
    Trims the dataset to the region within the latitude frame bounds.

    Args:
        ds (xarray.Dataset):
            Input dataset to be trimmed.
        along_track_dim (str, optional):
            Dimension along which to trim. Defaults to ALONG_TRACK_DIM.
        lat_var (str, optional):
            Name of the latitude variable. Defaults to TRACK_LAT_VAR.
        frame_id (str | None, optional):
            EarthCARE frame ID (single character between "A" and "H").
            If given, speeds up trimming. Defaults to None.
        add_trim_index_offset_var (bool, optional):
            Whether the index offset between the original and trimmed dataset is stored
            in the trimmed dataset (variable: "trim_index_offset"). Defaults to True.

    Returns:
        xarray.Dataset: Trimmed dataset.
    """
    slice_tuple = get_frame_index_range(
        frame_id=frame_id,
        ds=ds,
        lat_var=lat_var,
    )
    ds = ds.isel({along_track_dim: slice(*slice_tuple)})
    if add_trim_index_offset_var and slice_tuple[0] > 0:
        ds = insert_var(
            ds=ds,
            var=trim_index_offset_var,
            data=int(slice_tuple[0]),
            index=0,
            after_var="processing_start_time",
        )
        ds[trim_index_offset_var] = ds[trim_index_offset_var].assign_attrs(
            {
                "earthcarekit": "Added by earthcarekit: Used to calculate the index in the original, untrimmed dataset, i.e. by addition."
            }
        )
    return ds

filter_index

filter_index(
    ds: Dataset,
    index: int | slice | NDArray | Sequence,
    along_track_dim: str = ALONG_TRACK_DIM,
    trim_index_offset_var: str = "trim_index_offset",
    pad_idxs: int = 0,
) -> Dataset

Filters a dataset given an along-track index number, list/array or range/slice.

Parameters:

Name Type Description Default
ds Dataset

Input dataset with along-track dimension.

required
index int | slice | NDArray

Index(es) to filter.

required
along_track_dim str

Dimension along which to apply filtering. Defaults to ALONG_TRACK_DIM.

ALONG_TRACK_DIM
pad_idxs int

Number of additional samples added at both sides of the selection. This input is ignored when index is an array-like. Defaults to 0.

0

Returns:

Name Type Description
Dataset Dataset

Filtered dataset.

Examples:

>>> fp = "ECA_EXBC_CPR_FMR_2A_20260108T030403Z_20260108T042349Z_09167F.h5"
>>> with eck.read_product(fp) as ds:
>>>     ds_filtered = eck.filter_index(ds, 123)
>>>     print(ds_filtered.sizes)
Frozen({'along_track': 1, 'vertical': 218})
>>>         ds_filtered = eck.filter_index(ds, slice(0, 1000))
>>>         print(ds_filtered.sizes)
Frozen({'along_track': 1000, 'vertical': 218})
>>>         ds_filtered = eck.filter_index(ds, (0, 1000))
>>>         print(ds_filtered.sizes)
Frozen({'along_track': 2, 'vertical': 218})
Source code in earthcarekit/filter/_filter_index.py
def filter_index(
    ds: Dataset,
    index: int | slice | NDArray | Sequence,
    along_track_dim: str = ALONG_TRACK_DIM,
    trim_index_offset_var: str = "trim_index_offset",
    pad_idxs: int = 0,
) -> Dataset:
    """
    Filters a dataset given an along-track index number, list/array or range/slice.

    Args:
        ds (Dataset): Input dataset with along-track dimension.
        index (int | slice | NDArray): Index(es) to filter.
        along_track_dim (str, optional): Dimension along which to apply filtering. Defaults to ALONG_TRACK_DIM.
        pad_idxs (int, optional): Number of additional samples added at both sides of the selection.
            This input is ignored when `index` is an array-like. Defaults to 0.

    Returns:
        Dataset: Filtered dataset.

    Examples:
        ```python
        >>> fp = "ECA_EXBC_CPR_FMR_2A_20260108T030403Z_20260108T042349Z_09167F.h5"
        >>> with eck.read_product(fp) as ds:
        >>>     ds_filtered = eck.filter_index(ds, 123)
        >>>     print(ds_filtered.sizes)
        Frozen({'along_track': 1, 'vertical': 218})
        >>>         ds_filtered = eck.filter_index(ds, slice(0, 1000))
        >>>         print(ds_filtered.sizes)
        Frozen({'along_track': 1000, 'vertical': 218})
        >>>         ds_filtered = eck.filter_index(ds, (0, 1000))
        >>>         print(ds_filtered.sizes)
        Frozen({'along_track': 2, 'vertical': 218})
        ```
    """
    if isinstance(index, np.ndarray) and len(index.shape) == 0:
        index = int(index)
    elif isinstance(index, (Sequence, np.ndarray)):
        if len(index) == 0:
            raise ValueError("index must be integer or non-empty array")
        elif len(index) == 1:
            index = int(index[0])

    if isinstance(index, int):
        index = slice(index, index + 1)

    if isinstance(index, slice):
        index = slice(index.start - pad_idxs, index.stop + pad_idxs, index.step)
    else:
        index = flatten_array(index)

    ds_new = ds.copy().isel({along_track_dim: index})
    new_trim_index_offset: int | NDArray = 0

    if isinstance(index, slice):
        if isinstance(index.step, int) and index.step > 1:
            new_trim_index_offset = np.array(list(range(index.start, index.stop, index.step)))
        else:
            new_trim_index_offset = int(index.start)

    if isinstance(index, np.ndarray):
        if np.max(np.diff(index)) > 1:
            new_trim_index_offset = index
        else:
            new_trim_index_offset = int(index[0])

    if trim_index_offset_var in ds_new:
        old_trim_index_offset = ds_new[trim_index_offset_var].values
        trim_index_offset = np.asarray(old_trim_index_offset + new_trim_index_offset)

        if len(trim_index_offset.shape) == 0:
            ds_new[trim_index_offset_var] = trim_index_offset
        else:
            ds_new[trim_index_offset_var] = ("new_dim", trim_index_offset)
    else:
        ds_new = insert_var(
            ds=ds_new,
            var=trim_index_offset_var,
            data=new_trim_index_offset,
            index=0,
            after_var="processing_start_time",
        )
        ds_new[trim_index_offset_var] = ds_new[trim_index_offset_var].assign_attrs(
            {
                "earthcarekit": "Added by earthcarekit: Used to calculate the index in the original, untrimmed dataset, i.e. by addition."
            }
        )

    return ds_new

filter_latitude

filter_latitude(
    ds: Dataset,
    lat_range: NumberPairNoneLike,
    start_before_pole: bool = True,
    end_before_pole: bool = True,
    only_center: bool = False,
    lat_var: str = TRACK_LAT_VAR,
    along_track_dim: str = ALONG_TRACK_DIM,
    trim_index_offset_var: str = "trim_index_offset",
    pad_idxs: int = 0,
    shift_idxs: int = 0,
) -> Dataset

Filters a dataset to include only points within a specified latitude range.

Parameters:

Name Type Description Default
ds Dataset

Input dataset with geolocation data.

required
lat_range NumericPairNoneLike

A pair of latitude values (min_lat, max_lat) defining the selection range.

required
start_before_pole bool

If True, selection starts before the pole when the track crosses one. Defaults to True.

True
end_before_pole bool

If True, selection ends before the pole when the track crosses one. Defaults to True.

True
only_center bool

If True, only the sample at the center index of selection is returned. Defaults to False.

False
lat_var str

Name of the latitude variable. Defaults to TRACK_LAT_VAR.

TRACK_LAT_VAR
along_track_dim str

Dimension along which to apply filtering. Defaults to ALONG_TRACK_DIM.

ALONG_TRACK_DIM
pad_idxs int

Number of additional samples added at both sides of the selection. Defaults to 0.

0
shift_idxs int

Offset number to shift selection of samples. Defaults to 0.

0

Raises:

Type Description
ValueError

If selection is empty.

Returns:

Type Description
Dataset

xr.Dataset: Filtered dataset containing only points within the specified latitude range.

Examples:

>>> fp = "ECA_EXBC_CPR_FMR_2A_20260108T030403Z_20260108T042349Z_09167F.h5"
>>> with eck.read_product(fp) as ds:
>>>     print(ds.latitude.values)
[-22.50316844 -22.51202978 -22.52089178 ... -67.48243216 -67.49074691 -67.49906148]
>>>     ds_filtered = eck.filter_latitude(ds, (-40, -30))
>>>     print(ds_filtered.latitude.values)
[-30.0036885  -30.01258957 -30.02149091 ... -39.98112826 -39.98962597 -39.99812425]
Source code in earthcarekit/filter/_filter_latitude.py
def filter_latitude(
    ds: xr.Dataset,
    lat_range: NumberPairNoneLike,
    start_before_pole: bool = True,
    end_before_pole: bool = True,
    only_center: bool = False,
    lat_var: str = TRACK_LAT_VAR,
    along_track_dim: str = ALONG_TRACK_DIM,
    trim_index_offset_var: str = "trim_index_offset",
    pad_idxs: int = 0,
    shift_idxs: int = 0,
) -> xr.Dataset:
    """
    Filters a dataset to include only points within a specified latitude range.

    Args:
        ds (xr.Dataset): Input dataset with geolocation data.
        lat_range (NumericPairNoneLike): A pair of latitude values (min_lat, max_lat) defining the selection range.
        start_before_pole (bool, optional): If True, selection starts before the pole when the track crosses one. Defaults to True.
        end_before_pole (bool, optional): If True, selection ends before the pole when the track crosses one. Defaults to True.
        only_center (bool, optional): If True, only the sample at the center index of selection is returned. Defaults to False.
        lat_var (str, optional): Name of the latitude variable. Defaults to TRACK_LAT_VAR.
        along_track_dim (str, optional): Dimension along which to apply filtering. Defaults to ALONG_TRACK_DIM.
        pad_idxs (int, optional): Number of additional samples added at both sides of the selection. Defaults to 0.
        shift_idxs (int, optional): Offset number to shift selection of samples. Defaults to 0.

    Raises:
        ValueError: If selection is empty.

    Returns:
        xr.Dataset: Filtered dataset containing only points within the specified latitude range.

    Examples:
        ```python
        >>> fp = "ECA_EXBC_CPR_FMR_2A_20260108T030403Z_20260108T042349Z_09167F.h5"
        >>> with eck.read_product(fp) as ds:
        >>>     print(ds.latitude.values)
        [-22.50316844 -22.51202978 -22.52089178 ... -67.48243216 -67.49074691 -67.49906148]
        >>>     ds_filtered = eck.filter_latitude(ds, (-40, -30))
        >>>     print(ds_filtered.latitude.values)
        [-30.0036885  -30.01258957 -30.02149091 ... -39.98112826 -39.98962597 -39.99812425]
        ```
    """
    lats = ds[lat_var].values

    satellite_crosses_pole, is_first_increase, mask_before_pole, mask_after_pole = (
        _get_pole_crossing_masks(ds, lat_var=lat_var)
    )

    lat_range = validate_numeric_pair(lat_range, fallback=(lats[0], lats[-1]))

    lats_mask: NDArray[np.bool_] = (lats >= np.min(lat_range)) & (lats <= np.max(lat_range))

    if satellite_crosses_pole and start_before_pole and not end_before_pole:
        if is_first_increase:
            mask_from_start = lats >= lat_range[0]
            mask_from_end = lats >= lat_range[1]
        else:
            mask_from_start = lats <= lat_range[0]
            mask_from_end = lats <= lat_range[1]

        mask_from_start_before_pole = np.logical_and(mask_before_pole, mask_from_start)
        mask_from_end_after_pole = np.logical_and(mask_after_pole, mask_from_end)

        mask = np.logical_or(mask_from_start_before_pole, mask_from_end_after_pole)
    elif satellite_crosses_pole and start_before_pole and end_before_pole:
        mask = np.logical_and(lats_mask, mask_before_pole)
    elif satellite_crosses_pole and not start_before_pole:
        mask = np.logical_and(lats_mask, mask_after_pole)
    else:
        mask = lats_mask

    if only_center:
        mask_true_idxs = np.where(mask)[0]
        if len(mask_true_idxs) > 0:
            idx_center = mask_true_idxs[len(mask_true_idxs) // 2]
            mask[:] = False
            mask[idx_center] = True

    mask = pad_true_sequence(mask, pad_idxs)
    mask = shift_true_sequence(mask, shift_idxs)

    if np.sum(mask) == 0:
        msg = f"No data falls into the given latitude range!\nIn the dataset latitude falls between {np.min(lats)} and {np.max(lats)}.\n"
        if satellite_crosses_pole:
            msg += "Note that the satellite crosses a pole (set `start_before_pole` and `end_before_pole`\nto clarify how the start and end of the range should be interpreted)."
        else:
            msg += "The satellite is not crossing a pole."
        raise ValueError(msg)

    da_mask: xr.DataArray = xr.DataArray(mask, dims=[along_track_dim], name=lat_var)

    ds_new: xr.Dataset = xr.Dataset(
        {
            var: (
                ds[var].copy().where(da_mask, drop=True)
                if along_track_dim in ds[var].dims
                else ds[var].copy()
            )
            for var in ds.data_vars
        }
    )
    ds_new.attrs = ds.attrs.copy()
    ds_new.encoding = ds.encoding.copy()

    new_trim_index_offset: int = int(np.argmax(mask))
    if trim_index_offset_var in ds_new:
        old_trim_index_offset = int(ds_new[trim_index_offset_var].values)
        trim_index_offset = old_trim_index_offset + new_trim_index_offset
        ds_new[trim_index_offset_var].values = np.asarray(trim_index_offset)
    else:
        ds_new = insert_var(
            ds=ds_new,
            var=trim_index_offset_var,
            data=new_trim_index_offset,
            index=0,
            after_var="processing_start_time",
        )
        ds_new[trim_index_offset_var] = ds_new[trim_index_offset_var].assign_attrs(
            {
                "earthcarekit": "Added by earthcarekit: Used to calculate the index in the original, untrimmed dataset, i.e. by addition."
            }
        )

    return ds_new

filter_radius

filter_radius(
    ds: Dataset,
    radius_km: float = 100.0,
    center_lat: float | None = None,
    center_lon: float | None = None,
    site: SiteLike | None = None,
    lat_var: str = TRACK_LAT_VAR,
    lon_var: str = TRACK_LON_VAR,
    along_track_dim: str = ALONG_TRACK_DIM,
    method: Literal["geodesic", "haversine"] = "geodesic",
    closest: bool = False,
    trim_index_offset_var: str = "trim_index_offset",
    pad_idxs: int = 0,
    shift_idxs: int = 0,
) -> Dataset

Filters a dataset to include only points within a specified radius of a geographic location.

Parameters:

Name Type Description Default
ds Dataset

Input dataset with geolocation data.

required
radius_km float

Radius (in kilometers) around the center location.

100.0
site SiteLike

Site object or name from which center location will be retrieved, alternatively center_lat and center_lon must be set.

None
center_lat float

Latitude of the center point, alternatively site must be set.

None
center_lon float

Longitude of the center point, alternatively site must be set.

None
lat_var str

Name of the latitude variable. Defaults to TRACK_LAT_VAR.

TRACK_LAT_VAR
lon_var str

Name of the longitude variable. Defaults to TRACK_LON_VAR.

TRACK_LON_VAR
along_track_dim str

Dimension along which to apply filtering. Defaults to ALONG_TRACK_DIM.

ALONG_TRACK_DIM
method Literal['geodesic', 'haversine']

Distance calculation method. Defaults to "geodesic".

'geodesic'
closest bool

If True, only the single closest sample is returned, otherwise all samples within radius. Defaults to False.

False
trim_index_offset_var str

dataset variable keeping track of index offsets caused by dataset trimming/filtering. Defaults to "trim_index_offset".

'trim_index_offset'
pad_idxs int

Number of additional samples added at both sides of the selection. Defaults to 0.

0
shift_idxs int

Offset number to shift selection of samples. Defaults to 0.

0

Returns:

Type Description
Dataset

xr.Dataset: Filtered dataset containing only points within the specified radius.

Raises:

Type Description
EmptyFilterResultError

If no data points are found within the radius.

ValueError

If the method is invalid.

Examples:

>>> fp = "ECA_EXBB_ATL_EBD_2A_20240902T210023Z_20251107T142547Z_01508B.h5"
>>> with eck.read_product(fp) as ds:
>>>     print(ds.sizes)
Frozen({'along_track': 5143, 'vertical': 242, 'layer': 25, 'n_state': 351})
>>>     ds_filtered = eck.filter_radius(ds, site="dushanbe")
>>>     print(ds_filtered.sizes)
Frozen({'along_track': 197, 'vertical': 242, 'layer': 25, 'n_state': 351})
>>>     ds_filtered = eck.filter_radius(ds, site="dushanbe", radius_km=200)
>>>     print(ds_filtered.sizes)
Frozen({'along_track': 399, 'vertical': 242, 'layer': 25, 'n_state': 351})
Source code in earthcarekit/filter/_filter_radius.py
def filter_radius(
    ds: xr.Dataset,
    # *,
    radius_km: float = 100.0,
    center_lat: float | None = None,
    center_lon: float | None = None,
    site: SiteLike | None = None,
    lat_var: str = TRACK_LAT_VAR,
    lon_var: str = TRACK_LON_VAR,
    along_track_dim: str = ALONG_TRACK_DIM,
    method: Literal["geodesic", "haversine"] = "geodesic",
    closest: bool = False,
    trim_index_offset_var: str = "trim_index_offset",
    pad_idxs: int = 0,
    shift_idxs: int = 0,
) -> xr.Dataset:
    """
    Filters a dataset to include only points within a specified radius of a geographic location.

    Args:
        ds (xr.Dataset): Input dataset with geolocation data.
        radius_km (float): Radius (in kilometers) around the center location.
        site (SiteLike, optional): Site object or name from which center location will be retrieved,
            alternatively `center_lat` and `center_lon` must be set.
        center_lat (float, optional): Latitude of the center point,
            alternatively `site` must be set.
        center_lon (float, optional): Longitude of the center point,
            alternatively `site` must be set.
        lat_var (str, optional): Name of the latitude variable. Defaults to TRACK_LAT_VAR.
        lon_var (str, optional): Name of the longitude variable. Defaults to TRACK_LON_VAR.
        along_track_dim (str, optional): Dimension along which to apply filtering. Defaults to ALONG_TRACK_DIM.
        method (Literal["geodesic", "haversine"], optional): Distance calculation method. Defaults to "geodesic".
        closest (bool, optional): If True, only the single closest sample is returned, otherwise all samples within radius. Defaults to False.
        trim_index_offset_var (str, optional): dataset variable keeping track of index offsets caused by dataset trimming/filtering. Defaults to "trim_index_offset".
        pad_idxs (int, optional): Number of additional samples added at both sides of the selection. Defaults to 0.
        shift_idxs (int, optional): Offset number to shift selection of samples. Defaults to 0.

    Returns:
        xr.Dataset: Filtered dataset containing only points within the specified radius.

    Raises:
        EmptyFilterResultError: If no data points are found within the radius.
        ValueError: If the `method` is invalid.

    Examples:
        ```python
        >>> fp = "ECA_EXBB_ATL_EBD_2A_20240902T210023Z_20251107T142547Z_01508B.h5"
        >>> with eck.read_product(fp) as ds:
        >>>     print(ds.sizes)
        Frozen({'along_track': 5143, 'vertical': 242, 'layer': 25, 'n_state': 351})
        >>>     ds_filtered = eck.filter_radius(ds, site="dushanbe")
        >>>     print(ds_filtered.sizes)
        Frozen({'along_track': 197, 'vertical': 242, 'layer': 25, 'n_state': 351})
        >>>     ds_filtered = eck.filter_radius(ds, site="dushanbe", radius_km=200)
        >>>     print(ds_filtered.sizes)
        Frozen({'along_track': 399, 'vertical': 242, 'layer': 25, 'n_state': 351})
        ```
    """
    _center_lat: float
    _center_lon: float

    if isinstance(site, str):
        site = get_site(site)

    if isinstance(site, Site):
        _center_lat = site.latitude
        _center_lon = site.longitude
    elif isinstance(center_lat, (int, float, np.integer, np.floating)) and isinstance(
        center_lon, (int, float, np.integer, np.floating)
    ):
        _center_lat = float(center_lat)
        _center_lon = float(center_lon)
    else:
        raise ValueError("Either 'site' or 'center_lat' and 'center_lon' must be given.")

    if method not in ["geodesic", "haversine"]:
        raise ValueError(r'Invalid method choosen. Available methods: {"geodesic", "haversine"}')

    satellite_coords = get_coords(ds, lat_var=lat_var, lon_var=lon_var)

    center_coords = (_center_lat, _center_lon)

    if method == "geodesic":
        distances = geodesic(center_coords, satellite_coords)
    else:
        distances = haversine(center_coords, satellite_coords)

    mask = np.array(distances < radius_km)

    if closest:
        closest_distance = np.min(distances)
        closest_filtered_index = int(np.argmin(np.abs(distances - closest_distance)))
        mask[:] = False
        mask[closest_filtered_index] = True

    mask = pad_true_sequence(mask, pad_idxs)
    mask = shift_true_sequence(mask, shift_idxs)

    da_mask = xr.DataArray(data=mask, dims=[along_track_dim])
    if np.sum(da_mask.values) < 1:
        raise EmptyFilterResultError(
            f"Could not find valid overpass for given inputs. Data lies outside the given {radius_km} km radius around ({center_lat} degN {center_lon} degE).",
            min_distance=float(np.min(distances)),
        )

    ds_new: xr.Dataset = xr.Dataset(
        {
            var: (
                ds[var].copy().where(da_mask, drop=True)
                if along_track_dim in ds[var].dims
                else ds[var].copy()
            )
            for var in ds.data_vars
        }
    )
    ds_new.attrs = ds.attrs.copy()
    ds_new.encoding = ds.encoding.copy()

    new_trim_index_offset: int = int(np.argmax(mask))
    if trim_index_offset_var in ds_new:
        old_trim_index_offset = int(ds_new[trim_index_offset_var].values)
        trim_index_offset = old_trim_index_offset + new_trim_index_offset
        ds_new[trim_index_offset_var].values = np.asarray(trim_index_offset)
    else:
        ds_new = insert_var(
            ds=ds_new,
            var=trim_index_offset_var,
            data=new_trim_index_offset,
            index=0,
            after_var="processing_start_time",
        )
        ds_new[trim_index_offset_var] = ds_new[trim_index_offset_var].assign_attrs(
            {
                "earthcarekit": "Added by earthcarekit: Used to calculate the index in the original, untrimmed dataset, i.e. by addition."
            }
        )
    return ds_new

filter_time

filter_time(
    ds: Dataset,
    time_range: TimeRangeLike | Iterable | None = None,
    timestamp: TimestampLike | None = None,
    only_center: bool = False,
    time_var: str = TIME_VAR,
    along_track_dim: str = ALONG_TRACK_DIM,
    trim_index_offset_var: str = "trim_index_offset",
    pad_idxs: int = 0,
    shift_idxs: int = 0,
) -> Dataset

Filters an xarray Dataset to include only samples within a given time range.

Parameters:

Name Type Description Default
ds Dataset

The input dataset containing a time coordinate.

required
time_range TimeRangeLike | Iterable | None

Start and end time of the range to filter, as strings or pandas timestamps. Defaults to None.

None
timestamp TimestampLike | None

A single timestamp for which the closest sample to return. Defaults to None.

None
only_center bool

If True, only the sample at the center index of selection is returned. Defaults to False.

False
time_var str

Name of the time variable in ds. Defaults to TIME_VAR.

TIME_VAR
along_track_dim str

Dimension name along which time is defined. Defaults to ALONG_TRACK_DIM.

ALONG_TRACK_DIM
pad_idxs int

Number of additional samples added at both sides of the selection. Defaults to 0.

0
shift_idxs int

Offset number to shift selection of samples. Defaults to 0.

0

Returns:

Type Description
Dataset

xr.Dataset: Subset of ds containing only samples within the specified time range.

Examples:

>>> fp = "ECA_EXBC_CPR_FMR_2A_20260108T030403Z_20260108T042349Z_09167F.h5"
>>> with eck.read_product(fp) as ds:
>>>     print(ds.time.values[[0, -1]])
['2026-01-08T03:04:08.393852288' '2026-01-08T03:15:57.401298304']
>>>     ds_filtered = eck.filter_time(ds, time_range=("2026-01-08 03:10", "2026-01-08 03:12"))
>>>     print(ds_filtered.time.values[[0, -1]])
['2026-01-08T03:10:00.115605248' '2026-01-08T03:11:59.985651712']
Source code in earthcarekit/filter/_filter_time.py
def filter_time(
    ds: xr.Dataset,
    time_range: TimeRangeLike | Iterable | None = None,
    timestamp: TimestampLike | None = None,
    only_center: bool = False,
    time_var: str = TIME_VAR,
    along_track_dim: str = ALONG_TRACK_DIM,
    trim_index_offset_var: str = "trim_index_offset",
    pad_idxs: int = 0,
    shift_idxs: int = 0,
) -> xr.Dataset:
    """
    Filters an xarray Dataset to include only samples within a given time range.

    Args:
        ds (xr.Dataset): The input dataset containing a time coordinate.
        time_range (TimeRangeLike | Iterable | None):
            Start and end time of the range to filter, as strings or pandas timestamps. Defaults to None.
        timestamp (TimestampLike | None): A single timestamp for which the closest sample to return. Defaults to None.
        only_center (bool, optional): If True, only the sample at the center index of selection is returned. Defaults to False.
        time_var (str, optional): Name of the time variable in `ds`. Defaults to TIME_VAR.
        along_track_dim (str, optional): Dimension name along which time is defined. Defaults to ALONG_TRACK_DIM.
        pad_idxs (int, optional): Number of additional samples added at both sides of the selection. Defaults to 0.
        shift_idxs (int, optional): Offset number to shift selection of samples. Defaults to 0.

    Returns:
        xr.Dataset: Subset of `ds` containing only samples within the specified time range.

    Examples:
        ```python
        >>> fp = "ECA_EXBC_CPR_FMR_2A_20260108T030403Z_20260108T042349Z_09167F.h5"
        >>> with eck.read_product(fp) as ds:
        >>>     print(ds.time.values[[0, -1]])
        ['2026-01-08T03:04:08.393852288' '2026-01-08T03:15:57.401298304']
        >>>     ds_filtered = eck.filter_time(ds, time_range=("2026-01-08 03:10", "2026-01-08 03:12"))
        >>>     print(ds_filtered.time.values[[0, -1]])
        ['2026-01-08T03:10:00.115605248' '2026-01-08T03:11:59.985651712']
        ```
    """
    if time_range is not None and timestamp is not None:
        raise ValueError("Can not use both arguments time_range and timestamp at the same time.")

    mask = get_filter_time_mask(
        ds=ds,
        time_range=time_range,
        timestamp=timestamp,
        only_center=only_center,
        time_var=time_var,
        pad_idxs=pad_idxs,
        shift_idxs=shift_idxs,
    )

    if np.sum(mask) == 0:
        times = ds[time_var].values
        msg = (
            f"No data falls into the given time range!\n"
            f"In the dataset time ranges from {times[0]} to {times[-1]}.\n"
        )
        raise ValueError(msg)

    da_mask: xr.DataArray = xr.DataArray(mask, dims=[along_track_dim], name=time_var)

    ds_new: xr.Dataset = xr.Dataset(
        {
            var: (
                ds[var].copy().where(da_mask, drop=True)
                if along_track_dim in ds[var].dims
                else ds[var].copy()
            )
            for var in ds.data_vars
        }
    )
    ds_new.attrs = ds.attrs.copy()
    ds_new.encoding = ds.encoding.copy()

    new_trim_index_offset: int = int(np.argmax(mask))
    if trim_index_offset_var in ds_new:
        if len(ds_new[trim_index_offset_var].values.shape) != 0:
            ds_new[trim_index_offset_var] = (
                [],
                ds_new[trim_index_offset_var].values[0],
            )
        old_trim_index_offset = int(ds_new[trim_index_offset_var].values)
        trim_index_offset = old_trim_index_offset + new_trim_index_offset
        ds_new[trim_index_offset_var].values = np.asarray(trim_index_offset)
    else:
        ds_new = insert_var(
            ds=ds_new,
            var=trim_index_offset_var,
            data=new_trim_index_offset,
            index=0,
            after_var="processing_start_time",
        )
        ds_new[trim_index_offset_var] = ds_new[trim_index_offset_var].assign_attrs(
            {
                "earthcarekit": "Added by earthcarekit: Used to calculate the index in the original, untrimmed dataset, i.e. by addition."
            }
        )

    return ds_new