Skip to content

API reference

earthcarekit.read

File reading utilities.

Notes

This module depends on other internal modules:


FileAgency

Bases: FileInfoEnum

Source code in earthcarekit/read/info/agency.py
class FileAgency(FileInfoEnum):
    ESA = "E"
    JAXA = "J"

    @classmethod
    def from_input(cls, input: str | xr.Dataset) -> "FileAgency":
        """Infers the EarthCARE product agency (i.e. ESA or JAXA) from a given file or dataset."""
        if isinstance(input, str):
            try:
                return cls[input.upper()]
            except AttributeError:
                pass
            except KeyError:
                pass
            try:
                return cls(input.upper())
            except ValueError:
                pass

        return get_file_agency(input)

from_input classmethod

from_input(input: str | Dataset) -> FileAgency

Infers the EarthCARE product agency (i.e. ESA or JAXA) from a given file or dataset.

Source code in earthcarekit/read/info/agency.py
@classmethod
def from_input(cls, input: str | xr.Dataset) -> "FileAgency":
    """Infers the EarthCARE product agency (i.e. ESA or JAXA) from a given file or dataset."""
    if isinstance(input, str):
        try:
            return cls[input.upper()]
        except AttributeError:
            pass
        except KeyError:
            pass
        try:
            return cls(input.upper())
        except ValueError:
            pass

    return get_file_agency(input)

FileLatency

Bases: FileInfoEnum

Source code in earthcarekit/read/info/latency.py
class FileLatency(FileInfoEnum):
    NEAR_REAL_TIME = "N"
    OFFLINE = "O"
    NOT_APPLICABLE = "X"

    @classmethod
    def from_input(cls, input: str | xr.Dataset) -> "FileLatency":
        """Infers the EarthCARE product latency indicator (i.e. N for Near-real time, O for Offline, X for not applicable) from a given name, file or dataset."""
        if isinstance(input, str):
            try:
                return cls[input.upper()]
            except AttributeError:
                pass
            except KeyError:
                pass
            try:
                return cls(input.upper())
            except ValueError:
                pass

        return get_file_latency(input)

from_input classmethod

from_input(input: str | Dataset) -> FileLatency

Infers the EarthCARE product latency indicator (i.e. N for Near-real time, O for Offline, X for not applicable) from a given name, file or dataset.

Source code in earthcarekit/read/info/latency.py
@classmethod
def from_input(cls, input: str | xr.Dataset) -> "FileLatency":
    """Infers the EarthCARE product latency indicator (i.e. N for Near-real time, O for Offline, X for not applicable) from a given name, file or dataset."""
    if isinstance(input, str):
        try:
            return cls[input.upper()]
        except AttributeError:
            pass
        except KeyError:
            pass
        try:
            return cls(input.upper())
        except ValueError:
            pass

    return get_file_latency(input)

FileType

Bases: FileInfoEnum

Source code in earthcarekit/read/info/type.py
class FileType(FileInfoEnum):
    # Level 1
    ATL_NOM_1B = "ATL_NOM_1B"
    ATL_DCC_1B = "ATL_DCC_1B"
    ATL_CSC_1B = "ATL_CSC_1B"
    ATL_FSC_1B = "ATL_FSC_1B"
    MSI_NOM_1B = "MSI_NOM_1B"
    MSI_BBS_1B = "MSI_BBS_1B"
    MSI_SD1_1B = "MSI_SD1_1B"
    MSI_SD2_1B = "MSI_SD2_1B"
    MSI_RGR_1C = "MSI_RGR_1C"
    BBR_NOM_1B = "BBR_NOM_1B"
    BBR_SNG_1B = "BBR_SNG_1B"
    BBR_SOL_1B = "BBR_SOL_1B"
    BBR_LIN_1B = "BBR_LIN_1B"
    CPR_NOM_1B = "CPR_NOM_1B"  # JAXA product
    # Level 2a
    ATL_FM__2A = "ATL_FM__2A"
    ATL_AER_2A = "ATL_AER_2A"
    ATL_ICE_2A = "ATL_ICE_2A"
    ATL_TC__2A = "ATL_TC__2A"
    ATL_EBD_2A = "ATL_EBD_2A"
    ATL_CTH_2A = "ATL_CTH_2A"
    ATL_ALD_2A = "ATL_ALD_2A"
    MSI_CM__2A = "MSI_CM__2A"
    MSI_COP_2A = "MSI_COP_2A"
    MSI_AOT_2A = "MSI_AOT_2A"
    CPR_FMR_2A = "CPR_FMR_2A"
    CPR_CD__2A = "CPR_CD__2A"
    CPR_TC__2A = "CPR_TC__2A"
    CPR_CLD_2A = "CPR_CLD_2A"
    CPR_APC_2A = "CPR_APC_2A"
    ATL_CLA_2A = "ATL_CLA_2A"  # JAXA product
    MSI_CLP_2A = "MSI_CLP_2A"  # JAXA product
    CPR_ECO_2A = "CPR_ECO_2A"  # JAXA product
    CPR_CLP_2A = "CPR_CLP_2A"  # JAXA product
    # Level 2b
    AM__MO__2B = "AM__MO__2B"
    AM__CTH_2B = "AM__CTH_2B"
    AM__ACD_2B = "AM__ACD_2B"
    AC__TC__2B = "AC__TC__2B"
    BM__RAD_2B = "BM__RAD_2B"
    BMA_FLX_2B = "BMA_FLX_2B"
    ACM_CAP_2B = "ACM_CAP_2B"
    ACM_COM_2B = "ACM_COM_2B"
    ACM_RT__2B = "ACM_RT__2B"
    ALL_DF__2B = "ALL_DF__2B"
    ALL_3D__2B = "ALL_3D__2B"
    AC__CLP_2B = "AC__CLP_2B"  # JAXA product
    ACM_CLP_2B = "ACM_CLP_2B"  # JAXA product
    ALL_RAD_2B = "ALL_RAD_2B"  # JAXA product
    # Auxiliary data
    AUX_MET_1D = "AUX_MET_1D"
    AUX_JSG_1D = "AUX_JSG_1D"
    # Orbit data
    MPL_ORBSCT = "MPL_ORBSCT"
    AUX_ORBPRE = "AUX_ORBPRE"
    AUX_ORBRES = "AUX_ORBRES"

    @classmethod
    def from_input(cls, input: str | xr.Dataset) -> "FileType":
        """Infers the EarthCARE product type from a given file or dataset."""
        if isinstance(input, str):
            try:
                return cls[format_file_type_string(input)]
            except AttributeError:
                pass
            except KeyError:
                pass
            try:
                return cls(format_file_type_string(input))
            except ValueError:
                pass
            except KeyError:
                pass

        return get_file_type(input)

    @classmethod
    def list(cls):
        return list(map(lambda c: c.value, cls))

    def to_shorthand(self, with_dash: bool = False):
        if with_dash:
            return FILE_TYPE_SHORT_HAND[self.value]
        else:
            return FILE_TYPE_SHORT_HAND[self.value].replace("-", "")

    def get_level(self) -> Literal["1B", "1C", "2A", "2B", "1D", "ORB"]:
        if self.value[-2:] in ["1B", "1C", "1D", "2A", "2B"]:
            return self.value[-2:]  # type: ignore
        elif self.value in [
            FileType.MPL_ORBSCT.value,
            FileType.AUX_ORBPRE.value,
            FileType.AUX_ORBRES.value,
        ]:
            return "ORB"
        raise NotImplementedError(f"missing implementation for {self}")

from_input classmethod

from_input(input: str | Dataset) -> FileType

Infers the EarthCARE product type from a given file or dataset.

Source code in earthcarekit/read/info/type.py
@classmethod
def from_input(cls, input: str | xr.Dataset) -> "FileType":
    """Infers the EarthCARE product type from a given file or dataset."""
    if isinstance(input, str):
        try:
            return cls[format_file_type_string(input)]
        except AttributeError:
            pass
        except KeyError:
            pass
        try:
            return cls(format_file_type_string(input))
        except ValueError:
            pass
        except KeyError:
            pass

    return get_file_type(input)

LazyDataset dataclass

Warning

WARNING: EXPERIMENTAL CLASS

Interface and behaviour are subject to change in future version!

EarthCARE data container intended as a lightweight alternative to xarray.Dataset for faster variable access.

This class partially mimics the basic interface of xarray.Dataset, providing similar syntax for variable access (e.g., ds["x"]) and related metadata (e.g., ds.dims, ds["x"].dims, ds["x"].values, ds["x"].long_name, or ds["x"].attrs["long_name"]).

Variables must be accessed at least once within a with block to be loaded.

Warning

Support by other earthcarekit tools is currently limited, but CurtainFigure should work.

Attributes:

Name Type Description
filepath str

Path to a EarthCARE data file in HDF5/NetCDF-4 format (.h5).

trim_to_frame bool

Whether to trim the dataset to latitude frame bounds. Defaults to True.

in_memory bool

If True, load dataset variables eagerly into memory. Otherwise, variables are loaded lazily upon access. If vars is provided, only the specified variables are loaded. Defaults to False.

to_geoid bool

If True, converts variables representing height/altitude values from HAE (WGS84) to AMSL (EGM96) using the geoid_offset variable. Defaults to False.

vars str | Iterable[str] | None

Variable name or collection of names to load at initialization. If None and in_memory is True, all variables are still loaded. Defaults to None.

origin Literal['native', 'derived'] | None

Product origin identifier.

  • "native": file is an original EarthCARE product.
  • "derived": file was generated from a native product through post-processing or transformation (e.g., nadir cross-sections of AUX_MET_1C).
  • None: automatically detect the origin from the filename schema.

Defaults to None.

logger Logger

Logger instance used to diplay debug messages. Defaults to root logger.

Example:

>>> with LazyDataset(fp) as ds:
>>>     var = "mie_attenuated_backscatter"
>>>     ds[var].attrs["long_name"] = "Co-polar part. bsc. coeff."
>>>     cfig = eck.CurtainFigure()
>>>     cfig.ecplot(ds, var)
>>>     cfig.ecplot_temperature(ds)
>>>     cfig.ecplot_elevation(ds)
Source code in earthcarekit/read/lazy/_dataset.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
@dataclass
class LazyDataset:
    """
    !!! warning
        **WARNING: EXPERIMENTAL CLASS**

        **Interface and behaviour are subject to change in future version!**

    EarthCARE data container intended as a lightweight alternative to `xarray.Dataset` for faster variable access.

    This class partially mimics the basic interface of `xarray.Dataset`, providing similar syntax for variable access
    (e.g., `ds["x"]`) and related metadata (e.g., `ds.dims`, `ds["x"].dims`, `ds["x"].values`, `ds["x"].long_name`, or `ds["x"].attrs["long_name"]`).

    Variables must be accessed at least once within a `with` block to be loaded.

    !!! warning
        Support by other `earthcarekit` tools is currently limited, but `CurtainFigure` should work.

    Attributes:
        filepath (str):
            Path to a EarthCARE data file in HDF5/NetCDF-4 format (.h5).
        trim_to_frame (bool, optional):
            Whether to trim the dataset to latitude frame bounds. Defaults to True.
        in_memory (bool, optional):
            If True, load dataset variables eagerly into memory.
            Otherwise, variables are loaded lazily upon access.
            If `vars` is provided, only the specified variables are loaded. Defaults to False.
        to_geoid (bool, optional):
            If True, converts variables representing height/altitude values from HAE (WGS84)
            to AMSL (EGM96) using the `geoid_offset` variable. Defaults to False.
        vars (str | Iterable[str] | None, optional):
            Variable name or collection of names to load at initialization.
            If None and `in_memory` is True, all variables are still loaded. Defaults to None.
        origin (Literal["native", "derived"] | None, optional):
            Product origin identifier.

            - `"native"`: file is an original EarthCARE product.
            - `"derived"`: file was generated from a native product through post-processing or \
                transformation (e.g., nadir cross-sections of `AUX_MET_1C`).
            - None: automatically detect the origin from the filename schema.

            Defaults to None.
        logger (Logger, optional):
            Logger instance used to diplay debug messages. Defaults to root logger.

    Example:

        >>> with LazyDataset(fp) as ds:
        >>>     var = "mie_attenuated_backscatter"
        >>>     ds[var].attrs["long_name"] = "Co-polar part. bsc. coeff."
        >>>     cfig = eck.CurtainFigure()
        >>>     cfig.ecplot(ds, var)
        >>>     cfig.ecplot_temperature(ds)
        >>>     cfig.ecplot_elevation(ds)
    """

    filepath: str | HTTPFile
    trim_to_frame: bool = True
    in_memory: bool = False
    to_geoid: bool = False
    vars: str | Iterable[str] | None = field(default=None, repr=False)
    origin: Literal["native", "derived"] | None = field(default=None, repr=False)
    logger: logging.Logger = logging.getLogger()
    _ds_grp_esa: str = field(default="ScienceData", repr=False)
    _ds_grp_jaxa_geo: str = field(default="ScienceData/Geo", repr=False)
    _ds_grp_jaxa_data: str = field(default="ScienceData/Data", repr=False)
    _fill_value_float: float = field(default=9e36, repr=False)
    _profile_validation_state: ProfileValidationState | None = field(default=None, repr=False)
    _slice_along_track: slice = field(default_factory=_default_slice, repr=False)
    _slice_vertical: slice = field(default_factory=_default_slice, repr=False)
    _slice_across_track: slice = field(default_factory=_default_slice, repr=False)
    _file: h5py.File = field(default=None, repr=False)
    _varname_map: dict[str, str] = field(default_factory=dict, repr=False)
    _height_vars: set[str] = field(default_factory=_default_height_vars, repr=False)
    _read: bool = field(default=True, repr=False)
    fsspec_kwargs: dict[str, Any] = field(default_factory=dict, repr=False)

    def __post_init__(self) -> None:
        self._info: dict[str, Any]
        self._http_file: None | HTTPFile = None
        self._fspec = None

        if isinstance(self.filepath, str) and is_url(self.filepath):
            fsspec_kwargs = get_default_fsspec_kwargs()
            fsspec_kwargs.update(self.fsspec_kwargs)
            self.fsspec_kwargs = fsspec_kwargs
            fs = fsspec.filesystem(**self.fsspec_kwargs)
            self._http_file = fs.open(self.filepath, "rb")
            self.filepath = str(self._http_file.url)

        elif isinstance(self.filepath, HTTPFile):
            self._http_file = self.filepath
            self.filepath = str(self.filepath.url)

        self._info = get_file_info_from_str(self.filepath)
        file_type = self._info["file_type"]
        self._is_jaxa: bool = self._info["agency"] == "J"
        self._nadir_index: int | None = DEFAULT_NADIR_INDEX.get(file_type)
        self._loaded_vars: list[str] = []
        self._data: dict[str, LazyVariable] = {}
        self._sizes: dict[str, int] = {}
        if self.origin is None:
            self.origin = detect_product_origin(self.filepath)
        self._defaults: ProductDefaults | None = (
            get_defaults(file_type) if self.origin == "native" else None
        )

        if self._defaults:
            self._varname_map = self._defaults.get_varname_map() | self._varname_map
            self._height_vars = self._height_vars.union(self._defaults.height_vars)

        if self._read is False:
            return

        if self.in_memory and self._file is None:
            self.open()
            self.load(self.vars)
            self.close()

    def __enter__(self: "LazyDataset") -> "LazyDataset":
        if self._read is False:
            return self

        if self._file is None or not bool(self._file.id.valid):
            if self._http_file:
                self._file = h5py.File(self._http_file, "r")
            else:
                self._file = h5py.File(self.filepath, "r")

        if self._is_jaxa:
            lats_untrimmed = np.array(
                self._file.get("ScienceData/Geo", self._file)[
                    self._varname_map.get(TRACK_LAT_VAR, TRACK_LAT_VAR)
                ][self._slice_along_track],
                dtype=np.float64,
            )
            if lats_untrimmed.ndim == 2:
                self._sizes["along_track"] = lats_untrimmed.shape[0]
                self._sizes["across_track"] = lats_untrimmed.shape[1]
            else:
                for height_var in ["height", "binHeight", "bin_height"]:
                    try:
                        height_shape = self._file.get(self._ds_grp_jaxa_geo, self._file)[
                            height_var
                        ].shape
                        break
                    except KeyError:
                        continue
                else:
                    raise KeyError("missing height variable")

                self._sizes["along_track"] = height_shape[0]
                self._sizes["vertical"] = height_shape[1]
        else:
            lats_untrimmed = np.array(
                self._file.get(self._ds_grp_esa, self._file)[
                    self._varname_map.get(TRACK_LAT_VAR, TRACK_LAT_VAR)
                ][self._slice_along_track],
                dtype=np.float64,
            )

        self._slice_across_track_valid: slice
        if self._nadir_index is not None:
            lats_untrimmed = lats_untrimmed[:, self._slice_across_track]
            lats_untrimmed = LazyDataset._filter_fill_value(lats_untrimmed)
            idxs = np.argwhere(~np.isnan(lats_untrimmed).all(axis=0))
            self._slice_across_track_valid = slice(int(idxs[0][0]), int(idxs[-1][0]) + 1)
            lats_untrimmed = lats_untrimmed[:, self._slice_across_track_valid]

            vars = self.variables
            for i, angle_var in enumerate((SENSOR_ELEVATION_ANGLE_VAR, SENSOR_ZENITH_ANGLE_VAR)):
                angle_var = self._varname_map.get(angle_var, angle_var)
                if angle_var not in vars:
                    continue

                if self._is_jaxa:
                    angle = np.array(
                        self._file.get(self._ds_grp_jaxa_geo, self._file)[angle_var][
                            :, self._slice_across_track
                        ][:, self._slice_across_track_valid],
                        dtype=np.float32,
                    )
                else:
                    angle = np.array(
                        self._file.get(self._ds_grp_esa, self._file)[angle_var][
                            :, self._slice_across_track
                        ][:, self._slice_across_track_valid],
                        dtype=np.float32,
                    )
                angle = LazyDataset._filter_fill_value(angle)

                if i == 0:
                    self._nadir_index = int(np.median(np.nanargmax(angle, axis=1)))
                else:
                    self._nadir_index = int(np.median(np.nanargmin(angle, axis=1)))
                break

            lats_untrimmed = lats_untrimmed[:, self._nadir_index]
        else:
            self._slice_across_track_valid = slice(None)

        self._slice_along_track_frame: slice = slice(
            *get_frame_slice_tuple(
                lats_untrimmed,
                frame_id=self._info["frame_id"],
            )
        )

        def _add_info_var(_var: str, _rename: str | None = None) -> None:
            if _rename is None:
                _rename = _var
            _lvar = LazyVariable(
                varname=_rename,
                dims=(),
                attrs={},
                values=np.asarray(self._info[_var]),
                _dataset=self,
            )
            self._add_var(_lvar.varname, _lvar)

        _add_info_var("filename")
        _add_info_var("file_type")
        _add_info_var("frame_id")
        _add_info_var("orbit_number")
        _add_info_var("orbit_and_frame")
        _add_info_var("baseline")
        _add_info_var("start_sensing_time", "sensing_start_time")
        _add_info_var("start_processing_time", "processing_start_time")

        lvar_trim_index_offset = LazyVariable(
            varname="trim_index_offset",
            dims=(),
            attrs={},
            values=np.asarray(
                self._slice_along_track_frame.start if self.trim_to_frame else 0, dtype=int
            ),
            _dataset=self,
        )
        self._add_var(lvar_trim_index_offset.varname, lvar_trim_index_offset)

        if self._nadir_index is not None:
            lvar_nadir_index = LazyVariable(
                varname="nadir_index",
                dims=(),
                attrs={"long_name": "Nadir index"},
                values=np.asarray(self._nadir_index),
                _dataset=self,
            )
            self._add_var(lvar_nadir_index.varname, lvar_nadir_index)

        if self.vars is not None:
            self.load(self.vars)

        return self

    def __exit__(
        self: "LazyDataset",
        exc_type: Any,
        exc: Any,
        tb: Any,
    ) -> Literal[False]:
        if self._file:
            self._file.close()

        if self._http_file:
            self._http_file.close()
        return False

    def __getitem__(self, key: str) -> LazyVariable:
        return self.get(key)

    def __contains__(self, item: str) -> bool:
        if self.is_open:
            return self.contains(item)
        return self.contains_loaded(item)

    def __getattr__(self, name):
        try:
            return self._data[name]
        except KeyError:
            raise AttributeError(f"'{LazyDataset.__name__}' object has no attribute '{name}'")

    def __dir__(self):
        return super().__dir__() + list(self._data.keys())

    def open(self) -> None:
        if not self.is_open:
            self.__enter__()

    def close(self) -> None:
        if self.is_open:
            self.__exit__(None, None, None)

    @classmethod
    def get_supported_file_types(cls) -> set[str]:
        return get_supported_file_types()

    @classmethod
    def _filter_fill_value(
        cls,
        values: NDArray,
    ) -> NDArray:
        if np.issubdtype(values.dtype, np.floating):
            return np.where(values < cls._fill_value_float, values, np.nan)
        return values

    @property
    def is_open(self) -> bool:
        """Whether the underlying file is open for read."""
        try:
            return bool(self._file.id.valid)
        except AttributeError:
            return False

    @property
    def variables(self) -> list[str]:
        """Names of variables available for access.

        Returns a list of variable names. If the underlying file is still open,
        the list includes both already loaded variables and variables that can be loaded lazily.
        Otherwise, only loaded variables are included.
        """
        if self.is_open:
            if self._is_jaxa:
                return [
                    var
                    for var, var_obj in self._file.get(self._ds_grp_jaxa_geo, self._file).items()
                    if (
                        isinstance(var_obj, h5py.Dataset)
                        and _get_str_attrs(var_obj.attrs).get("CLASS") != "DIMENSION_SCALE"
                    )
                ] + [
                    var
                    for var, var_obj in self._file.get(self._ds_grp_jaxa_data, self._file).items()
                    if (
                        isinstance(var_obj, h5py.Dataset)
                        and _get_str_attrs(var_obj.attrs).get("CLASS") != "DIMENSION_SCALE"
                    )
                ]
            return [
                var
                for var, var_obj in self._file.get(self._ds_grp_esa, self._file).items()
                if (
                    isinstance(var_obj, h5py.Dataset)
                    and _get_str_attrs(var_obj.attrs).get("CLASS") != "DIMENSION_SCALE"
                )
            ]
        return list(self._data.keys())

    @property
    def optional_variables(self) -> list[str]:
        if self._defaults:
            return list(self._defaults.generators.keys()) + list(
                self._defaults.optional_generators.keys()
            )
        return []

    @property
    def sizes(self) -> MappingProxyType[str, int]:
        """Mapping from dimension names to lengths."""
        return MappingProxyType(self._sizes)

    @property
    def dims(self) -> list[str]:
        """List of dimension names."""
        return list(self.sizes.keys())

    @property
    def nadir_index(self) -> int | None:
        """Index of the across-track nadir pixel or None if not applicable."""
        return self._nadir_index

    def contains_loaded(self, item: str) -> bool:
        return item in self._data

    def contains(self, item: str) -> bool:
        return self.contains_loaded(item) or (item in self.variables)

    def copy(self) -> "LazyDataset":
        kwargs = {
            f.name: copy.copy(getattr(self, f.name)) for f in fields(self) if f.name != "_file"
        }
        kwargs["_file"] = self._file
        lds = LazyDataset(**kwargs)
        lds._data = {k: v.copy() for k, v in self._data.items()}
        lds._sizes = {k: copy.copy(v) for k, v in self._sizes.items()}
        return lds

    def get(self, var: str) -> LazyVariable:
        """Retrieves a variables by name.

        Variables are returned under the following conditions:

        1. If the variable is already loaded.
        2. If not loaded but a generator exists for the given `var`, generates the variable first.
        3. Otherwise, attempts to load the variable from the underlying dataset file.

        Args:
            var (str): Name of the variable to retrieve.

        Returns:
            LazyVariable: The requested variable.

        Raises:
            KeyError: If `var` refers to a dimension or the variable cannot be loaded.
        """
        if var in self._data:
            return self._data[var]
        elif self._defaults:
            generator = self._defaults.generators.get(var)
            if generator is not None:
                generated_lvars = generator(self)
                for generated_lvar in generated_lvars:
                    if generated_lvar.varname not in self._data:
                        self._add_var(generated_lvar.varname, generated_lvar)
                return generated_lvars[0]

            generator = self._defaults.optional_generators.get(var)
            if generator is not None:
                generated_lvars = generator(self)
                for generated_lvar in generated_lvars:
                    if generated_lvar.varname not in self._data:
                        self._add_var(generated_lvar.varname, generated_lvar)
                return generated_lvars[0]

        lvar_loaded = self._load_var(var)
        if lvar_loaded is None:
            raise KeyError(f"'{var}' is a dimension, not a variable")

        return lvar_loaded

    def _load_var_obj(self, var: str) -> h5py.Dataset:
        """Reads a variable from the underlying file.

        Args:
            var (str): Name of variable to read from file.

        Raises:
            ValueError: If file is closed.
            KeyError: If `var` does not exist in file.

        Returns:
            h5py.Dataset: The requested variable.
        """
        try:
            if self._is_jaxa:
                try:
                    var_obj = self._file.get(self._ds_grp_jaxa_geo, self._file)[var]
                except KeyError:
                    var_obj = self._file.get(self._ds_grp_jaxa_data, self._file)[var]
            else:
                var_obj = self._file.get(self._ds_grp_esa, self._file)[var]

            assert isinstance(var_obj, h5py.Dataset)

            return cast(h5py.Dataset, var_obj)

        except KeyError as e:
            if not self.is_open:
                raise ValueError(f"I/O operation on closed file; '{var}' was not loaded yet") from e
            raise e

    def _load_var(
        self,
        var: str,
        dtype: np.dtype | Type[Any] | None = None,
        is_time: bool = False,
        time_unit: Literal["D", "s", "ms", "us", "ns"] | None = "s",
        time_origin: (TimestampConvertibleTypes | Literal["julian", "unix"] | None) = None,
        rolling_w: int | None = None,
    ) -> LazyVariable | None:
        """
        Reads variable from underlying file and applies post-processing according to kind and available defaults.

        Args:
            var (str): Name of the variable.
            dtype (np.dtype | Type[Any] | None, optional): Data type to convert to. Defaults to None.
            is_time (bool, optional): Whether values represent time and should to be converted to `np.datetime`. Defaults to False.
            time_unit (Literal["D", "s", "ms", "us", "ns"] | None, optional): The unit in which time is measured. Defaults to "s".
            time_origin (TimestampConvertibleTypes | Literal["julian", "unix"], optional): The reference date since when time is measured. Defaults to "2000-01-01T00:00:00".
            rolling_w (int | None, optional): Window size for optional rolling mean smoothing. Defaults to None.

        Raises:
            RuntimeError: If `var` is already loaded.

        Returns:
            LazyVariable | None: Returns None if `var` refers to a dimension name. Otherwise, returns the requested and post-processed variable.
        """
        self.logger.debug("* Loading '%s'", var)

        if var in self._data:
            raise RuntimeError(f"variable already loaded: '{var}'")

        var = self._varname_map.get(var, var)

        if var == self._varname_map.get(TIME_VAR, TIME_VAR):
            is_time = True

        var_obj: h5py.Dataset = self._load_var_obj(var)

        attrs: dict[str, str] = _get_str_attrs(var_obj.attrs)
        if attrs.get("CLASS") == "DIMENSION_SCALE":
            return None
        if "units" in attrs:
            attrs["units"] = UNITS_RENAME_MAP.get(attrs["units"], attrs["units"])

        dims = _get_var_obj_dims(var_obj=var_obj, known_sizes=self._sizes)

        _slice: list[slice] = [slice(None)] * len(dims)
        _slice_frame: list[slice] = [slice(None)] * len(dims)
        _slice_across_track_valid: list[slice] = [slice(None)] * len(dims)

        if "along_track" in dims:
            iat = dims.index("along_track")
            _slice[iat] = self._slice_along_track
            _slice_frame[iat] = self._slice_along_track_frame
        if "vertical" in dims:
            _slice[dims.index("vertical")] = self._slice_vertical
        if "across_track" in dims:
            iat = dims.index("across_track")
            _slice[iat] = self._slice_across_track
            _slice_across_track_valid[iat] = self._slice_across_track_valid

        values: NDArray
        if is_time:
            if time_origin is None:
                try:  # FIXME
                    units = np.array(var_obj.attrs["units"]).item().decode("utf-8")
                    if "nanoseconds since " in units:
                        time_unit = "ns"
                        time_origin = units.lstrip("nanoseconds since ")
                    else:
                        time_unit = "s"
                        time_origin = units.lstrip("seconds since ")
                except Exception:
                    time_origin = "2000-01-01 00:00:00 0:00"
            values = np.array(
                pd.to_datetime(
                    var_obj[*_slice],
                    unit=time_unit,
                    origin=time_origin,
                ),
                dtype="datetime64[ns]",
            )
        else:
            values = LazyDataset._filter_fill_value(np.array(var_obj[*_slice], dtype=dtype))

        if isinstance(rolling_w, int):
            if values.ndim == 2 and "along_track" in dims:
                values = rolling_mean_2d(values, rolling_w, axis=dims.index("along_track"))
            elif values.ndim == 1 and dims[0] == "along_track":
                values = rolling_mean_1d(values, rolling_w)

        if self.trim_to_frame:
            values = values[*_slice_frame]

        if self._nadir_index is not None:
            values = values[*_slice_across_track_valid]

        if self.to_geoid and var in self._height_vars and var != GEOID_OFFSET_VAR:
            geoid_offset = np.nan_to_num(self.get(GEOID_OFFSET_VAR).values, nan=0.0)
            _comment = attrs.get("earthcarekit", "")
            if len(_comment) > 0:
                _comment += "\n"
            attrs["earthcarekit"] = (
                f"{_comment}Converted by earthcarekit to height over geoid EGM96."
            )
            if values.ndim == 2 and dims[0] == "along_track":
                values = values - geoid_offset[:, np.newaxis]
            elif values.ndim == 1 and dims[0] == "along_track":
                values = values - geoid_offset
            elif values.ndim == 2 and dims[1] == "along_track":
                values = values - geoid_offset[np.newaxis, :]

        if str(values.dtype) == "|S1":
            values = np.array([b"".join(row).decode("utf-8").strip() for row in values])
            dims = (dims[0],)

        lvar = LazyVariable(
            varname=var,
            dims=dims,
            attrs=attrs,
            values=values,
            _dataset=self,
        )

        self._perform_default_transforms(var, lvar)

        for d, s in zip(dims, values.shape):
            self._sizes.setdefault(d, s)

        self._add_common_var(var, lvar)

        self._add_var(var, lvar)

        return lvar

    def _add_var(self, var: str, lvar: LazyVariable) -> None:
        self.logger.debug("  Adding '%s'", var)

        self._data[var] = lvar
        self._loaded_vars.append(var)

    def load(self, vars: str | Iterable[str] | None = None) -> "LazyDataset":
        if vars is None:
            vars = self.variables
            if self._defaults:
                vars = vars + list(self._defaults.generators.keys())

        if is_iterable_of_str(vars):
            for var in vars:
                self.get(var)
        elif isinstance(vars, str):
            self.get(vars)
        else:
            raise TypeError("expected iterable of strings")

        return self

    def get_profile(
        self,
        var: str,
        keepdims: bool = True,
    ) -> Profile:
        vars = self.variables
        lvar = self.get(var)
        if lvar.dims != ("along_track", "vertical"):
            raise RuntimeError(
                f"Not a profile; '{var}' does not contain time/height data: {lvar.dims}"
            )

        profile = Profile(
            values=lvar.values,
            height=self["height"].values,
            time=self["time"].values,
            latitude=(None if "latitude" not in vars else self["latitude"].values),
            longitude=(None if "longitude" not in vars else self["longitude"].values),
            units=lvar.attrs.get("units"),
            label=lvar.attrs.get("long_name"),
            keepdims=keepdims,
            _validation=self._profile_validation_state,
        )

        if self._profile_validation_state is None:
            self._profile_validation_state = profile._validation

        return profile

    def to_xarray(self) -> xr.Dataset:
        ds = xr.Dataset(
            {da.name: da for da in [self[var].to_xarray() for var in self._loaded_vars]}
        )
        ds.encoding["source"] = self.filepath
        return ds

    @classmethod
    def from_xarray(cls, ds: xr.Dataset) -> "LazyDataset":
        new_lds = cls(
            filepath=ds.encoding["source"],
            trim_to_frame=ds["trim_index_offset"].values != 0,
            in_memory=False,
            _read=False,
        )

        for _var in ds.variables:
            var = str(_var)
            dims = ds[var].dims
            shape = ds[var].shape
            lvar = LazyVariable(
                varname=var,
                dims=cast(tuple[str, ...], dims),
                attrs=cast(dict[str, str], ds[var].attrs),
                values=ds[var].values,
                _dataset=new_lds,
            )

            new_lds._data[var] = lvar
            new_lds._loaded_vars.append(var)
            for d, s in zip(dims, shape):
                new_lds._sizes.setdefault(cast(str, d), s)

        return new_lds

    def _get_common_var(self, var: str) -> str | None:
        return {v: k for k, v in self._varname_map.items()}.get(var)

    def _add_common_var(
        self,
        var: str,
        lvar: LazyVariable,
    ) -> bool:
        """Added variable(s) to dataset if given inputs refer to a common variable (e.g., "height", "time", "elevation", ...).

        If `var` refers to a common variable, transforms it for normalization adds given `LazyVariable` to given `LazyDataset`
        instance and returns True. Otherwise, no side effects and returns False.

        Args:
            var (str): Original name of variable (i.e., not standard name but name as used in original dataset file, e.g., "sample_altitude" in A-NOM instead of standard name "height").
            lvar (_LazyVariable): Variable instance that may be transformed.

        Returns:
            bool: If variables where added to dataset returns True. Otherwise, just returns False.
        """
        common_var = self._get_common_var(var)
        if not common_var:
            return False

        func = get_common_var_transformer(common_var)
        if not func:
            return False

        func(common_var, self, lvar)
        return True

    def _perform_default_transforms(self, var: str, lvar: LazyVariable) -> bool:
        if self._defaults:
            func = self._defaults.transforms.get(var)
            if func:
                lvars = func(self, lvar)
                lvar = lvars[0]
                for x in lvars[1:]:
                    self._add_var(x.varname, x)
                return True
        return False

dims property

dims: list[str]

List of dimension names.

get

get(var: str) -> LazyVariable

Retrieves a variables by name.

Variables are returned under the following conditions:

  1. If the variable is already loaded.
  2. If not loaded but a generator exists for the given var, generates the variable first.
  3. Otherwise, attempts to load the variable from the underlying dataset file.

Parameters:

Name Type Description Default
var str

Name of the variable to retrieve.

required

Returns:

Name Type Description
LazyVariable LazyVariable

The requested variable.

Raises:

Type Description
KeyError

If var refers to a dimension or the variable cannot be loaded.

Source code in earthcarekit/read/lazy/_dataset.py
def get(self, var: str) -> LazyVariable:
    """Retrieves a variables by name.

    Variables are returned under the following conditions:

    1. If the variable is already loaded.
    2. If not loaded but a generator exists for the given `var`, generates the variable first.
    3. Otherwise, attempts to load the variable from the underlying dataset file.

    Args:
        var (str): Name of the variable to retrieve.

    Returns:
        LazyVariable: The requested variable.

    Raises:
        KeyError: If `var` refers to a dimension or the variable cannot be loaded.
    """
    if var in self._data:
        return self._data[var]
    elif self._defaults:
        generator = self._defaults.generators.get(var)
        if generator is not None:
            generated_lvars = generator(self)
            for generated_lvar in generated_lvars:
                if generated_lvar.varname not in self._data:
                    self._add_var(generated_lvar.varname, generated_lvar)
            return generated_lvars[0]

        generator = self._defaults.optional_generators.get(var)
        if generator is not None:
            generated_lvars = generator(self)
            for generated_lvar in generated_lvars:
                if generated_lvar.varname not in self._data:
                    self._add_var(generated_lvar.varname, generated_lvar)
            return generated_lvars[0]

    lvar_loaded = self._load_var(var)
    if lvar_loaded is None:
        raise KeyError(f"'{var}' is a dimension, not a variable")

    return lvar_loaded

is_open property

is_open: bool

Whether the underlying file is open for read.

nadir_index property

nadir_index: int | None

Index of the across-track nadir pixel or None if not applicable.

sizes property

sizes: MappingProxyType[str, int]

Mapping from dimension names to lengths.

variables property

variables: list[str]

Names of variables available for access.

Returns a list of variable names. If the underlying file is still open, the list includes both already loaded variables and variables that can be loaded lazily. Otherwise, only loaded variables are included.

ProductInfo dataclass

Class storing all info gathered from a EarthCARE product's file path.

Attributes:

Name Type Description
mission_id FileMissionID

Mission ID (ECA = EarthCARE).

agency FileAgency

Agency that generated the file (E = ESA, J = JAXA).

latency FileLatency

Latency indicator (X = not applicable, N = near real-time, O = offline).

baseline str

Two-letter product/processor version string (e.g., "BA").

file_type FileType

Full product name (10 characters, e.g., "ATL_EBD_2A").

start_sensing_time Timestamp

Start-time of data collection (i.e., time of first available data in the product).

start_processing_time Timestamp

Start-time of processing (i.e., time at which creation of the product started).

orbit_number int

Number of the orbit.

frame_id str

Single letter identifier between A and H, indication the orbit segment (A,B,H = night frames; D,E,F = day frames; C,G = polar day/night frames).

orbit_and_frame str

Six-character string with leading zeros combining orbit number and frame ID.

filename str

Full name of the product without file extension.

filepath str

Local file path or empty string if not available.

hdr_filepath str

Local header file path or empty string if not available.

start_latitude float

Track start latitude [deg. N].

start_longitude float

Track start longitude [deg. E].

end_latitude float

Track end latitude [deg. N].

end_longitude float

Track end longitude [deg. E].

Source code in earthcarekit/read/info/product_info.py
@dataclass
class ProductInfo:
    """
    Class storing all info gathered from a EarthCARE product's file path.

    Attributes:
        mission_id (FileMissionID):
            Mission ID (ECA = EarthCARE).
        agency (FileAgency):
            Agency that generated the file (E = ESA, J = JAXA).
        latency (FileLatency):
            Latency indicator (X = not applicable, N = near real-time, O = offline).
        baseline (str):
            Two-letter product/processor version string (e.g., "BA").
        file_type (FileType):
            Full product name (10 characters, e.g., "ATL_EBD_2A").
        start_sensing_time (pd.Timestamp):
            Start-time of data collection (i.e., time of first available data in the product).
        start_processing_time (pd.Timestamp):
            Start-time of processing (i.e., time at which creation of the product started).
        orbit_number (int):
            Number of the orbit.
        frame_id (str):
            Single letter identifier between A and H, indication the orbit segment
            (A,B,H = night frames; D,E,F = day frames; C,G = polar day/night frames).
        orbit_and_frame (str):
            Six-character string with leading zeros combining orbit number and frame ID.
        filename (str):
            Full name of the product without file extension.
        filepath (str):
            Local file path or empty string if not available.
        hdr_filepath (str):
            Local header file path or empty string if not available.
        start_latitude (float):
            Track start latitude [deg. N].
        start_longitude (float):
            Track start longitude [deg. E].
        end_latitude (float):
            Track end latitude [deg. N].
        end_longitude (float):
            Track end longitude [deg. E].
    """

    mission_id: FileMissionID
    agency: FileAgency
    latency: FileLatency
    baseline: str
    file_type: FileType
    start_sensing_time: pd.Timestamp
    start_processing_time: pd.Timestamp
    orbit_number: int
    frame_id: str
    orbit_and_frame: str
    filename: str
    filepath: str
    hdr_filepath: str
    start_latitude: float = float("nan")
    start_longitude: float = float("nan")
    end_latitude: float = float("nan")
    end_longitude: float = float("nan")
    url_download_h5: str | None = None
    url_download_hdr: str | None = None
    url_quicklook: str | None = None

    def to_dict(self) -> dict:
        """Returns product info as a Python `dict`."""
        return asdict(self)

    def to_dataframe(self) -> "ProductDataFrame":
        """Returns product info as a `pandas.Dataframe`."""
        return ProductDataFrame([self])

to_dataframe

to_dataframe() -> ProductDataFrame

Returns product info as a pandas.Dataframe.

Source code in earthcarekit/read/info/product_info.py
def to_dataframe(self) -> "ProductDataFrame":
    """Returns product info as a `pandas.Dataframe`."""
    return ProductDataFrame([self])

to_dict

to_dict() -> dict

Returns product info as a Python dict.

Source code in earthcarekit/read/info/product_info.py
def to_dict(self) -> dict:
    """Returns product info as a Python `dict`."""
    return asdict(self)

add_depol_ratio

add_depol_ratio(
    ds_anom: Dataset,
    rolling_w: int = 20,
    near_zero_tolerance: float = 2e-07,
    smooth: bool = True,
    skip_height_above_elevation: int = 300,
    cpol_var: str = "mie_attenuated_backscatter",
    xpol_var: str = "crosspolar_attenuated_backscatter",
    elevation_var: str = ELEVATION_VAR,
    height_var: str = HEIGHT_VAR,
    height_dim: str = VERTICAL_DIM,
) -> Dataset

Compute depolarization ratio (DPOL = XPOL/CPOL) from attenuated backscatter signals.

This function derives the depol. ratio from cross-polarized (XPOL) and co-polarized (CPOL) attenuated backscatter signals. Signals below the surface are masked, by default with a vertical margin on 300 meters above elevation to remove potential surface return. Also, signals are smoothed (or "cleaned") with a rolling mean, and near-zero divisions are suppressed and set to NaN instead. In the resulting dataset, the ratio curtain and a ratio profile calculated from mean profiles of the full dataset (e.g., mean(XPOL)/mean(CPOL)).

Parameters:

Name Type Description Default
ds_anom Dataset

ATL_NOM_1B dataset containing cross- and co-polar attenuated backscatter.

required
rolling_w int

Window size for rolling mean smoothing. Defaults to 20.

20
near_zero_tolerance float

Tolerance for masking near-zero CPOL (i.e., denominators). Defaults to 2e-7.

2e-07
smooth bool

Whether to apply rolling mean smoothing. Defaults to True.

True
skip_height_above_elevation int

Vertical margin above surface elevation to mask in meters. Defaults to 300.

300
cpol_var str

Input co-polar variable name. Defaults to "mie_attenuated_backscatter".

'mie_attenuated_backscatter'
xpol_var str

Input cross-polar variable name. Defaults to "crosspolar_attenuated_backscatter".

'crosspolar_attenuated_backscatter'
elevation_var str

Elevation variable name. Defaults to ELEVATION_VAR.

ELEVATION_VAR
height_var str

Height variable name. Defaults to HEIGHT_VAR.

HEIGHT_VAR
height_dim str

Height dimension name. Defaults to VERTICAL_DIM.

VERTICAL_DIM

Returns:

Type Description
Dataset

xr.Dataset: Dataset with added depol. ratio, cleaned signals, and depol. ratio profile from mean profiles.

Source code in earthcarekit/read/product/level1/atl_nom_1b.py
def add_depol_ratio(
    ds_anom: xr.Dataset,
    rolling_w: int = 20,
    near_zero_tolerance: float = 2e-7,
    smooth: bool = True,
    skip_height_above_elevation: int = 300,
    cpol_var: str = "mie_attenuated_backscatter",
    xpol_var: str = "crosspolar_attenuated_backscatter",
    elevation_var: str = ELEVATION_VAR,
    height_var: str = HEIGHT_VAR,
    height_dim: str = VERTICAL_DIM,
) -> xr.Dataset:
    """
    Compute depolarization ratio (`DPOL` = `XPOL`/`CPOL`) from attenuated backscatter signals.

    This function derives the depol. ratio from cross-polarized (`XPOL`) and co-polarized (`CPOL`) attenuated backscatter signals.
    Signals below the surface are masked, by default with a vertical margin on 300 meters above elevation to remove potential surface return.
    Also, signals are smoothed (or "cleaned") with a rolling mean, and near-zero divisions are suppressed and set to NaN instead.
    In the resulting dataset, the ratio curtain and a ratio profile calculated from mean profiles of the full dataset (e.g., mean(`XPOL`)/mean(`CPOL`)).

    Args:
        ds_anom (xr.Dataset): ATL_NOM_1B dataset containing cross- and co-polar attenuated backscatter.
        rolling_w (int, optional): Window size for rolling mean smoothing. Defaults to 20.
        near_zero_tolerance (float, optional): Tolerance for masking near-zero `CPOL` (i.e., denominators). Defaults to 2e-7.
        smooth (bool, optional): Whether to apply rolling mean smoothing. Defaults to True.
        skip_height_above_elevation (int, optional): Vertical margin above surface elevation to mask in meters. Defaults to 300.
        cpol_var (str, optional): Input co-polar variable name. Defaults to "mie_attenuated_backscatter".
        xpol_var (str, optional): Input cross-polar variable name. Defaults to "crosspolar_attenuated_backscatter".
        elevation_var (str, optional): Elevation variable name. Defaults to ELEVATION_VAR.
        height_var (str, optional): Height variable name. Defaults to HEIGHT_VAR.
        height_dim (str, optional): Height dimension name. Defaults to VERTICAL_DIM.

    Returns:
        xr.Dataset: Dataset with added depol. ratio, cleaned signals, and depol. ratio profile from mean profiles.
    """
    return add_scattering_ratio(
        ds_anom=ds_anom,
        formula="x/c",
        rolling_w=rolling_w,
        near_zero_tolerance=near_zero_tolerance,
        smooth=smooth,
        skip_height_above_elevation=skip_height_above_elevation,
        cpol_var=cpol_var,
        xpol_var=xpol_var,
        elevation_var=elevation_var,
        height_var=height_var,
        height_dim=height_dim,
    )

add_isccp_cloud_type

add_isccp_cloud_type(
    ds: Dataset,
    new_var: str = "isccp_cloud_type",
    cot_var: str = "cloud_optical_thickness",
    cth_var: str = "cloud_top_height",
    along_track_dim: str = ALONG_TRACK_DIM,
    across_track_dim: str = ACROSS_TRACK_DIM,
) -> Dataset

Adds a variable to the dataset containing ISCCP cloud types calculated from cloud optical thickness (COT) and cloud top height (CTH).

Parameters:

Name Type Description Default
ds Dataset

A MSI_COP_2A dataset.

required
new_var str

Name of the new ISCCP cloud type variable. Defaults to "isccp_cloud_type".

'isccp_cloud_type'
cot_var str

Name of the COT variable in ds. Defaults to "cloud_optical_thickness".

'cloud_optical_thickness'
cth_var str

Name of the CTH variable in ds. Defaults to "cloud_top_height".

'cloud_top_height'
along_track_dim str

Name of the along-track dimension in ds. Defaults to ALONG_TRACK_DIM.

ALONG_TRACK_DIM
across_track_dim str

Name of the across-track dimension in ds. Defaults to ACROSS_TRACK_DIM.

ACROSS_TRACK_DIM

Returns:

Type Description
Dataset

xr.Dataset: The input dataset with added ISCCP cloud type variable.

References
  • International Satellite Cloud Climatology Project (ISCCP). ISCCP Definition of Cloud Types. Retrieved September 25, 2025. https://isccp.giss.nasa.gov/cloudtypes.html
Source code in earthcarekit/read/product/level2a/msi_cop_2a.py
def add_isccp_cloud_type(
    ds: xr.Dataset,
    new_var: str = "isccp_cloud_type",
    cot_var: str = "cloud_optical_thickness",
    cth_var: str = "cloud_top_height",
    along_track_dim: str = ALONG_TRACK_DIM,
    across_track_dim: str = ACROSS_TRACK_DIM,
) -> xr.Dataset:
    """
    Adds a variable to the dataset containing ISCCP cloud types calculated from cloud optical thickness (COT)
    and cloud top height (CTH).

    Args:
        ds (xr.Dataset): A MSI_COP_2A dataset.
        new_var (str, optional): Name of the new ISCCP cloud type variable. Defaults to "isccp_cloud_type".
        cot_var (str, optional): Name of the COT variable in `ds`. Defaults to "cloud_optical_thickness".
        cth_var (str, optional): Name of the CTH variable in `ds`. Defaults to "cloud_top_height".
        along_track_dim (str, optional): Name of the along-track dimension in `ds`. Defaults to ALONG_TRACK_DIM.
        across_track_dim (str, optional): Name of the across-track dimension in `ds`. Defaults to ACROSS_TRACK_DIM.

    Returns:
        xr.Dataset: The input dataset with added ISCCP cloud type variable.

    References:
        - International Satellite Cloud Climatology Project (ISCCP). ISCCP Definition of Cloud Types.
        Retrieved September 25, 2025. https://isccp.giss.nasa.gov/cloudtypes.html
    """
    cot = ds[cot_var].values
    cth = ds[cth_var].values

    cu = np.where((cth >= 100) & (cth < 3200) & (cot >= 0.01) & (cot < 3.6))
    ac = np.where((cth >= 3200) & (cth < 6500) & (cot >= 0.01) & (cot < 3.6))
    ci = np.where((cth >= 6500) & (cth < 19300) & (cot >= 0.01) & (cot < 3.6))
    sc = np.where((cth >= 100) & (cth < 3200) & (cot >= 3.6) & (cot < 23))
    asc = np.where((cth >= 3200) & (cth < 6500) & (cot >= 3.6) & (cot < 23))
    cs = np.where((cth >= 6500) & (cth < 19300) & (cot >= 3.6) & (cot < 23))
    st = np.where((cth >= 100) & (cth < 3200) & (cot >= 23))
    ns = np.where((cth >= 3200) & (cth < 6500) & (cot >= 23))
    cb = np.where((cth >= 6500) & (cth < 19300) & (cot >= 23))
    clear = np.where((cot < 0.01) & (cot >= 0))

    cloud_type = np.empty(shape=cot.shape, dtype=int)
    cloud_type[:, :] = -127

    cloud_type[cu] = 1
    cloud_type[ac] = 2
    cloud_type[ci] = 3
    cloud_type[sc] = 4
    cloud_type[asc] = 5
    cloud_type[cs] = 6
    cloud_type[st] = 7
    cloud_type[ns] = 8
    cloud_type[cb] = 9
    cloud_type[clear] = 0

    da = xr.DataArray(
        cloud_type,
        dims=(along_track_dim, across_track_dim),
        name=new_var,
        attrs={
            "units": "",
            "long_name": "ISCCP cloud type calculated from M-COP",
            "definition": "0: Clear, 1: Cumulus, 2: Altocumulus, 3: Cirrus, 4: Stratocumulus, 5: Altostratus, 6: Cirrostratus, 7: Stratus, 8: Nimbostratus, 9: Deep convection, -127: Not determined",
            "earthcarekit": "Added by earthcarekit",
        },
    )
    ds[new_var] = da

    return ds

add_potential_temperature

add_potential_temperature(
    ds: Dataset,
    temperature_var: str = "temperature_kelvin",
    pressure_var: str = "pressure",
    new_var: str = "potential_temperature",
) -> Dataset

Computes potential temperature from temperature [K] and pressure [Pa] and adds it as a variable to the dataset (source: https://en.wikipedia.org/wiki/Potential_temperature, accessed: 2026-02-06).

Parameters:

Name Type Description Default
ds Dataset

Dataset (e.g., AUX_MET_1D) containing temperature [K] and pressure [Pa] data.

required
temperature_var str

Input temperature variable name. Defaults to "temperature_kelvin".

'temperature_kelvin'
pressure_var str

Input pressure variable name. Defaults to "pressure".

'pressure'
new_var str

New variable name for potential temperature. Defaults to "potential_temperature".

'potential_temperature'

Returns:

Type Description
Dataset

xr.Dataset: Dataset with 2 new variables for potential temperature profiles added (kelvin and celsius).

Source code in earthcarekit/read/product/auxiliary/aux_met_1d.py
def add_potential_temperature(
    ds: xr.Dataset,
    temperature_var: str = "temperature_kelvin",
    pressure_var: str = "pressure",
    new_var: str = "potential_temperature",
) -> xr.Dataset:
    """
    Computes potential temperature from temperature [K] and pressure [Pa] and adds it as a variable to the dataset (source: https://en.wikipedia.org/wiki/Potential_temperature, accessed: 2026-02-06).

    Args:
        ds (xr.Dataset): Dataset (e.g., AUX_MET_1D) containing temperature [K] and pressure [Pa] data.
        temperature_var (str, optional): Input temperature variable name. Defaults to "temperature_kelvin".
        pressure_var (str, optional): Input pressure variable name. Defaults to "pressure".
        new_var (str, optional): New variable name for potential temperature. Defaults to "potential_temperature".

    Returns:
        xr.Dataset: Dataset with 2 new variables for potential temperature profiles added (kelvin and celsius).
    """
    t = ds[temperature_var].values  # [K]
    p = ds[pressure_var].values  # [Pa]
    p0 = 100_000.0  # [Pa]
    rcp = 0.286
    potential_t = t * np.pow(p0 / p, rcp)

    attrs = {
        "units": "K",
        "long_name": "Potential temperature",
        "name": "Potential temperature",
    }
    ds[f"{new_var}_kelvin"] = ds[temperature_var].copy().drop_attrs().assign_attrs(attrs)
    ds[f"{new_var}_kelvin"].values = potential_t
    attrs["units"] = r"$^{\circ}$C"
    ds[f"{new_var}_celsius"] = ds[temperature_var].copy().drop_attrs().assign_attrs(attrs)
    ds[f"{new_var}_celsius"].values = potential_t - 273.15

    return ds

add_scattering_ratio

add_scattering_ratio(
    ds_anom: Dataset,
    formula: Literal["x/c", "(c+x)/r", "(c+x+r)/r"],
    rolling_w: int = 20,
    near_zero_tolerance: float = 2e-07,
    smooth: bool = True,
    skip_height_above_elevation: int = 300,
    cpol_var: str = "mie_attenuated_backscatter",
    xpol_var: str = "crosspolar_attenuated_backscatter",
    ray_var: str = "rayleigh_attenuated_backscatter",
    elevation_var: str = ELEVATION_VAR,
    height_var: str = HEIGHT_VAR,
    height_dim: str = VERTICAL_DIM,
) -> Dataset

Compute scattering ratio from attenuated backscatter signals given a formula: "x/c", "(c+x)/r", or "(c+x+r)/r".

This function derives the scattering ratio from cross-polarized (XPOL), co-polarized (CPOL) and rayleigh (RAY) attenuated backscatter signals. Signals below the surface are masked, by default with a vertical margin on 300 meters above elevation to remove potential surface return. Also, signals are smoothed (or "cleaned") with a rolling mean, and near-zero divisions are suppressed and set to NaN instead. In the resulting dataset, the ratio curtain and a ratio profile calculated from mean profiles of the full dataset (e.g., mean(XPOL)/mean(CPOL)).

Parameters:

Name Type Description Default
ds_anom Dataset

ATL_NOM_1B dataset containing the attenuated backscatter signals.

required
formula Literal['x/c', '(c+x)/r', '(c+x+r)/r']

Formula used to calculate the scattering ratio.

required
rolling_w int

Window size for rolling mean smoothing. Defaults to 20.

20
near_zero_tolerance float

Tolerance for masking near-zero denominators. Defaults to 2e-7.

2e-07
smooth bool

Whether to apply rolling mean smoothing. Defaults to True.

True
skip_height_above_elevation int

Vertical margin above surface elevation to mask in meters. Defaults to 300.

300
cpol_var str

Input co-polar variable name. Defaults to "mie_attenuated_backscatter".

'mie_attenuated_backscatter'
xpol_var str

Input cross-polar variable name. Defaults to "crosspolar_attenuated_backscatter".

'crosspolar_attenuated_backscatter'
ray_var str

Input rayleigh variable name. Defaults to "rayleigh_attenuated_backscatter".

'rayleigh_attenuated_backscatter'
elevation_var str

Elevation variable name. Defaults to ELEVATION_VAR.

ELEVATION_VAR
height_var str

Height variable name. Defaults to HEIGHT_VAR.

HEIGHT_VAR
height_dim str

Height dimension name. Defaults to VERTICAL_DIM.

VERTICAL_DIM

Returns:

Type Description
Dataset

xr.Dataset: xr.Dataset: Dataset with added ratio curtain and ratio profile from mean profiles.

Source code in earthcarekit/read/product/level1/atl_nom_1b.py
def add_scattering_ratio(
    ds_anom: xr.Dataset,
    formula: Literal["x/c", "(c+x)/r", "(c+x+r)/r"],
    rolling_w: int = 20,
    near_zero_tolerance: float = 2e-7,
    smooth: bool = True,
    skip_height_above_elevation: int = 300,
    cpol_var: str = "mie_attenuated_backscatter",
    xpol_var: str = "crosspolar_attenuated_backscatter",
    ray_var: str = "rayleigh_attenuated_backscatter",
    elevation_var: str = ELEVATION_VAR,
    height_var: str = HEIGHT_VAR,
    height_dim: str = VERTICAL_DIM,
) -> xr.Dataset:
    """
    Compute scattering ratio from attenuated backscatter signals given a formula: "x/c", "(c+x)/r", or "(c+x+r)/r".

    This function derives the scattering ratio from cross-polarized (`XPOL`), co-polarized (`CPOL`) and rayleigh (`RAY`) attenuated backscatter signals.
    Signals below the surface are masked, by default with a vertical margin on 300 meters above elevation to remove potential surface return.
    Also, signals are smoothed (or "cleaned") with a rolling mean, and near-zero divisions are suppressed and set to NaN instead.
    In the resulting dataset, the ratio curtain and a ratio profile calculated from mean profiles of the full dataset (e.g., mean(`XPOL`)/mean(`CPOL`)).

    Args:
        ds_anom (xr.Dataset): ATL_NOM_1B dataset containing the attenuated backscatter signals.
        formula (Literal["x/c", "(c+x)/r", "(c+x+r)/r"]): Formula used to calculate the scattering ratio.
        rolling_w (int, optional): Window size for rolling mean smoothing. Defaults to 20.
        near_zero_tolerance (float, optional): Tolerance for masking near-zero denominators. Defaults to 2e-7.
        smooth (bool, optional): Whether to apply rolling mean smoothing. Defaults to True.
        skip_height_above_elevation (int, optional): Vertical margin above surface elevation to mask in meters. Defaults to 300.
        cpol_var (str, optional): Input co-polar variable name. Defaults to "mie_attenuated_backscatter".
        xpol_var (str, optional): Input cross-polar variable name. Defaults to "crosspolar_attenuated_backscatter".
        ray_var (str, optional): Input rayleigh variable name. Defaults to "rayleigh_attenuated_backscatter".
        elevation_var (str, optional): Elevation variable name. Defaults to ELEVATION_VAR.
        height_var (str, optional): Height variable name. Defaults to HEIGHT_VAR.
        height_dim (str, optional): Height dimension name. Defaults to VERTICAL_DIM.

    Returns:
        xr.Dataset: xr.Dataset: Dataset with added ratio curtain and ratio profile from mean profiles.
    """

    if formula.lower() not in ["x/c", "(c+x)/r", "(c+x+r)/r"]:
        raise ValueError(f"invalid formula '{formula}', expected 'x/c', '(c+x)/r' or '(c+x+r)/r'")

    cpol_cleaned_var: str = "cpol_cleaned_for_ratio_calculation"
    xpol_cleaned_var: str = "xpol_cleaned_for_ratio_calculation"
    ray_cleaned_var: str = "ray_cleaned_for_ratio_calculation"

    cpol_da = ds_anom[cpol_var].copy()
    xpol_da = ds_anom[xpol_var].copy()
    ray_da = ds_anom[ray_var].copy()
    # if formula == "x/c":
    #     ray_da = xpol_da
    # else:

    def _calc(c, x, r):
        if formula == "x/c":
            return x / np.where(c == 0, np.nan, c)
        elif formula == "(c+x)/r":
            return (c + x) / np.where(r == 0, np.nan, r)
        elif formula == "(c+x+r)/r":
            return (c + x + r) / np.where(r == 0, np.nan, r)

    def _get_near_zero_mask(c, x, r):
        if formula == "x/c":
            return np.isclose(c, 0, atol=near_zero_tolerance)
        elif formula == "(c+x)/r":
            return np.isclose(r, 0, atol=near_zero_tolerance)
        elif formula == "(c+x+r)/r":
            return np.isclose(r, 0, atol=near_zero_tolerance)

    def _get_long_name():
        if formula == "x/c":
            return "Depol. ratio from cross- and co-polar atten. part. bsc."
        elif formula == "(c+x)/r":
            return "Total part. to ray. atten. bsc. ratio"
        elif formula == "(c+x+r)/r":
            return "Total to ray. atten. bsc. ratio"

    def _get_ratio_var():
        if formula == "x/c":
            return "depol_ratio"
        elif formula == "(c+x)/r":
            return "cpol_xpol_to_ray_ratio"
        elif formula == "(c+x+r)/r":
            return "cpol_xpol_ray_to_ray_ratio"

    ratio_var = _get_ratio_var()
    ratio_from_means_var = f"{ratio_var}_from_means"

    ds_anom[ratio_var] = _calc(cpol_da, xpol_da, ray_da)
    rename_var_info(
        ds_anom,
        ratio_var,
        name=ratio_var,
        long_name=_get_long_name(),
        units="",
    )

    elevation = ds_anom[elevation_var].data.copy()[:, np.newaxis] + skip_height_above_elevation
    mask_surface = ds_anom[height_var].data[0].copy() < elevation

    cpol = ds_anom[cpol_var].data
    xpol = ds_anom[xpol_var].data
    ray = ds_anom[ray_var].data
    # if formula == "x/c":
    #     ray = xpol
    # else:

    cpol[mask_surface] = np.nan
    xpol[mask_surface] = np.nan
    ray[mask_surface] = np.nan

    if smooth:
        cpol = rolling_mean_2d(cpol, rolling_w, axis=0)
        xpol = rolling_mean_2d(xpol, rolling_w, axis=0)
        ray = rolling_mean_2d(ray, rolling_w, axis=0)

    ds_anom[ratio_var].data = _calc(cpol, xpol, ray)
    ds_anom[ratio_var] = ds_anom[ratio_var].assign_attrs(
        {
            "earthcarekit": "Added by earthcarekit: Intended for use in curtain plots only!",
        }
    )

    if smooth:
        near_zero_mask = _get_near_zero_mask(cpol, xpol, ray)
        ds_anom[ratio_var].data[near_zero_mask] = np.nan
        cpol[near_zero_mask] = np.nan
        xpol[near_zero_mask] = np.nan
        ray[near_zero_mask] = np.nan

    ds_anom[xpol_cleaned_var] = ds_anom[xpol_var].copy()
    ds_anom[xpol_cleaned_var].data = xpol
    ds_anom[xpol_cleaned_var] = ds_anom[xpol_cleaned_var].assign_attrs(
        {
            "earthcarekit": f"Added by earthcarekit: Rolling mean applied (w={rolling_w}) and near-zero values removed (tolerance={near_zero_tolerance})"
        }
    )

    ds_anom[cpol_cleaned_var] = ds_anom[cpol_var].copy()
    ds_anom[cpol_cleaned_var].data = cpol
    ds_anom[cpol_cleaned_var] = ds_anom[cpol_cleaned_var].assign_attrs(
        {
            "earthcarekit": f"Added by earthcarekit: Rolling mean applied (w={rolling_w}) and near-zero values removed (tolerance={near_zero_tolerance})"
        }
    )

    # if formula == "x/c":
    ds_anom[ray_cleaned_var] = ds_anom[ray_var].copy()
    ds_anom[ray_cleaned_var].data = ray
    ds_anom[ray_cleaned_var] = ds_anom[ray_cleaned_var].assign_attrs(
        {
            "earthcarekit": f"Added by earthcarekit: Rolling mean applied (w={rolling_w}) and near-zero values removed (tolerance={near_zero_tolerance})"
        }
    )

    ratio_mean = _calc(
        nan_mean(cpol, axis=0),
        nan_mean(xpol, axis=0),
        nan_mean(ray, axis=0),
    )

    ds_anom[ratio_from_means_var] = xr.DataArray(
        data=ratio_mean,
        dims=[height_dim],
        attrs={
            "long_name": _get_long_name(),
            "units": "",
            "earthcarekit": "Added by earthcarekit: Scattering ratio profile calculated from the mean profiles",
        },
    )

    return ds_anom

get_product_info

get_product_info(
    filepath: str, warn: bool = False, must_exist: bool = True, read_geo_from_hdr: bool = False
) -> ProductInfo

Gather all info contained in the EarthCARE product's file path.

Source code in earthcarekit/read/info/product_info.py
def get_product_info(
    filepath: str,
    warn: bool = False,
    must_exist: bool = True,
    read_geo_from_hdr: bool = False,
) -> ProductInfo:
    """Gather all info contained in the EarthCARE product's file path."""
    if is_url(filepath):
        filepath = _get_path_from_url(filepath)
        must_exist = False

    filepath = os.path.abspath(filepath)

    if must_exist and not os.path.exists(filepath):
        raise FileNotFoundError(f"File does not exist: {filepath}")

    if must_exist:
        pattern = re.compile(
            r".*ECA_[EJ][XNO][A-Z]{2}_..._..._.._\d{8}T\d{6}Z_\d{8}T\d{6}Z_\d{5}[ABCDEFGH]\.h5"
        )
    else:
        pattern = re.compile(
            r".*ECA_[EJ][XNO][A-Z]{2}_..._..._.._\d{8}T\d{6}Z_\d{8}T\d{6}Z_\d{5}[ABCDEFGH].*"
        )
    is_match = bool(pattern.fullmatch(filepath))

    if not is_match:
        pattern_orbit_file = re.compile(
            r".*ECA_[EJ][XNO][A-Z]{2}_..._......_\d{8}T\d{6}Z_\d{8}T\d{6}Z_\d{4}.*"
        )
        is_match = bool(pattern_orbit_file.fullmatch(filepath))

        if not is_match:
            raise ValueError(f"EarthCARE product has invalid file name: {filepath}")

        filename = os.path.basename(filepath).removesuffix(".h5")
        mission_id = FileMissionID.from_input(filename[0:3])
        agency = FileAgency.from_input(filename[4])
        latency = FileLatency.from_input(filename[5])
        baseline = filename[6:8]
        file_type = FileType.from_input(filename[9:19])
        start_sensing_time: pd.Timestamp
        try:
            start_sensing_time = pd.Timestamp(filename[20:35])
        except ValueError:
            start_sensing_time = pd.NaT  # type: ignore
        start_processing_time: pd.Timestamp
        try:
            start_processing_time = pd.Timestamp(filename[37:52])
        except ValueError:
            start_processing_time = pd.NaT  # type: ignore

        if read_geo_from_hdr:
            filepath_hdr = filepath.rstrip(".h5") + ".HDR"
            start_latitude, start_longitude, end_latitude, end_longitude = (
                safe_read_geo_extent_from_hdr(filepath_hdr)
            )
        else:
            start_latitude = float("nan")
            start_longitude = float("nan")
            end_latitude = float("nan")
            end_longitude = float("nan")

        info = ProductInfo(
            mission_id=mission_id,
            agency=agency,
            latency=latency,
            baseline=baseline,
            file_type=file_type,
            start_sensing_time=start_sensing_time,
            start_processing_time=start_processing_time,
            orbit_number=0,
            frame_id="",
            orbit_and_frame="",
            filename=filename,
            filepath=filepath,
            hdr_filepath="",
            start_latitude=start_latitude,
            start_longitude=start_longitude,
            end_latitude=end_latitude,
            end_longitude=end_longitude,
        )

        return info

    product_filepath = filepath.removesuffix(".h5").removesuffix(".HDR") + ".h5"
    if not os.path.exists(product_filepath):
        if warn:
            msg = f"Missing product file: {product_filepath}"
            warnings.warn(msg)
        product_filepath = ""

    hdr_filepath = filepath.removesuffix(".h5").removesuffix(".HDR") + ".HDR"
    if not os.path.exists(hdr_filepath):
        if warn:
            msg = f"Missing product header file: {hdr_filepath}"
            warnings.warn(msg)
        hdr_filepath = ""

    filename = os.path.basename(filepath).removesuffix(".h5").removesuffix(".HDR")
    mission_id = FileMissionID.from_input(filename[0:3])
    agency = FileAgency.from_input(filename[4])
    latency = FileLatency.from_input(filename[5])
    baseline = filename[6:8]
    file_type = FileType.from_input(filename[9:19])
    start_sensing_time = pd.Timestamp(filename[20:35])
    start_processing_time = pd.Timestamp(filename[37:52])
    orbit_number = int(filename[54:59])
    frame_id = filename[59]
    orbit_and_frame = filename[54:60]

    if read_geo_from_hdr:
        filepath_hdr = filepath.rstrip(".h5") + ".HDR"
        start_latitude, start_longitude, end_latitude, end_longitude = (
            safe_read_geo_extent_from_hdr(filepath_hdr)
        )
    else:
        start_latitude = float("nan")
        start_longitude = float("nan")
        end_latitude = float("nan")
        end_longitude = float("nan")

    info = ProductInfo(
        mission_id=mission_id,
        agency=agency,
        latency=latency,
        baseline=baseline,
        file_type=file_type,
        start_sensing_time=start_sensing_time,
        start_processing_time=start_processing_time,
        orbit_number=orbit_number,
        frame_id=frame_id,
        orbit_and_frame=orbit_and_frame,
        filename=filename,
        filepath=product_filepath,
        hdr_filepath=hdr_filepath,
        start_latitude=start_latitude,
        start_longitude=start_longitude,
        end_latitude=end_latitude,
        end_longitude=end_longitude,
    )

    return info

get_product_infos

get_product_infos(
    filepaths: str | list[str] | NDArray | DataFrame | Dataset,
    warn: bool = False,
    must_exist: bool = True,
    read_geo_from_hdr: bool = False,
) -> ProductDataFrame

Extracts product metadata from EarthCARE product file paths (e.g. file_type, orbit_number, frame_id, baseline, ...).

Parameters:

Name Type Description Default
filepaths str | list[str] | NDArray | DataFrame | Dataset

Input sources for EarthCARE product files. Can be one of - str -> A single file path. - list[str] or numpy.ndarray -> A list or array of file paths. - pandas.DataFrame -> Must contain a 'filepath' column. - xarray.Dataset -> Must have encoding with attribute 'source' (str) or 'sources' (list[str]).

required

Returns:

Name Type Description
ProductDataFrame ProductDataFrame

A dataframe containing extracted product information.

Source code in earthcarekit/read/info/product_info.py
def get_product_infos(
    filepaths: str | list[str] | NDArray | pd.DataFrame | xr.Dataset,
    warn: bool = False,
    must_exist: bool = True,
    read_geo_from_hdr: bool = False,
) -> "ProductDataFrame":
    """
    Extracts product metadata from EarthCARE product file paths (e.g. file_type, orbit_number, frame_id, baseline, ...).

    Args:
        filepaths:
            Input sources for EarthCARE product files. Can be one of
            - `str` -> A single file path.
            - `list[str]` or `numpy.ndarray` -> A list or array of file paths.
            - `pandas.DataFrame` -> Must contain a 'filepath' column.
            - `xarray.Dataset` -> Must have encoding with attribute 'source' (`str`) or 'sources' (`list[str]`).

    Returns:
        ProductDataFrame: A dataframe containing extracted product information.
    """
    _filepaths: list[str] | NDArray
    if isinstance(filepaths, (str, np.str_)):
        _filepaths = [str(filepaths)]
    elif isinstance(filepaths, xr.Dataset):
        ds: xr.Dataset = filepaths
        if not hasattr(ds, "encoding"):
            raise ValueError("Dataset missing encoding attribute.")
        elif "source" in ds.encoding:
            _filepaths = [ds.encoding["source"]]
        elif "sources" in ds.encoding:
            _filepaths = ds.encoding["sources"]
        else:
            raise ValueError("Dataset encoding does not contain source or sources.")
    elif isinstance(filepaths, pd.DataFrame):
        df: pd.DataFrame = filepaths
        if "filepath" in df:
            _filepaths = df["filepath"].to_numpy()
        else:
            raise ValueError(
                """Given dataframe does not contain a column of file paths. A valid file path column name is "filepath"."""
            )
    else:
        _filepaths = filepaths

    infos = []
    for filepath in _filepaths:
        try:
            infos.append(
                get_product_info(
                    filepath,
                    warn=warn,
                    must_exist=must_exist,
                    read_geo_from_hdr=read_geo_from_hdr,
                ).to_dict()
            )
        except ValueError:
            continue
    pdf = ProductDataFrame(infos)
    pdf.validate_columns()
    return pdf

read_any

read_any(input: str | Dataset, **kwargs) -> Dataset

Reads various input types and returns an xarray.Dataset.

This function can read
  • EarthCARE product files (.h5)
  • NetCDF files (.nc)
  • Manually processed PollyXT output files (.txt)

Parameters:

Name Type Description Default
input str | Dataset

File path or existing Dataset.

required
**kwargs

Additional keyword arguments for specific readers.

{}

Returns:

Type Description
Dataset

xr.Dataset: Opened dataset.

Raises:

Type Description
ValueError

If the file type is not supported.

TypeError

If the input type is invalid.

Source code in earthcarekit/read/any.py
def read_any(input: str | xr.Dataset, **kwargs) -> xr.Dataset:
    """Reads various input types and returns an `xarray.Dataset`.

    This function can read:
        - EarthCARE product files (`.h5`)
        - NetCDF files (`.nc`)
        - Manually processed PollyXT output files (`.txt`)

    Args:
        input (str | xr.Dataset): File path or existing Dataset.
        **kwargs: Additional keyword arguments for specific readers.

    Returns:
        xr.Dataset: Opened dataset.

    Raises:
        ValueError: If the file type is not supported.
        TypeError: If the input type is invalid.
    """
    if isinstance(input, xr.Dataset):
        return input
    elif isinstance(input, str):
        filepath = input

        if is_earthcare_product(filepath=filepath):
            return read_product(filepath, **kwargs)

        filename = os.path.basename(filepath)
        _, ext = os.path.splitext(filename)
        if ext.lower() == ".txt":
            return read_polly(filepath)
        elif ext.lower() == ".nc":
            return read_nc(filepath, **kwargs)

        raise ValueError(f"Reading of file not supported: <{input}>")
    raise TypeError(f"Invalid type '{type(input).__name__}' for input.")

read_header_data

read_header_data(source: str) -> Dataset
read_header_data(source: Dataset) -> Dataset
read_header_data(source: str | Dataset) -> Dataset

Opens the product header groups of a EarthCARE file as a xarray.Dataset.

Source code in earthcarekit/read/header.py
def read_header_data(source: str | xr.Dataset) -> xr.Dataset:
    """Opens the product header groups of a EarthCARE file as a `xarray.Dataset`."""
    if isinstance(source, str):
        filepath = source
    elif isinstance(source, xr.Dataset):
        filepath = source.encoding.get("source", None)
        if filepath is None:
            raise ValueError("Dataset missing source attribute")
    else:
        raise TypeError("Expected 'str' or 'xarray.Dataset'")

    groups = xr.open_groups(filepath)
    header_groups = {n: g for n, g in groups.items() if "HeaderData" in n}

    # Rename duplicate vars

    all_vars = {}
    header_datasets = []
    for i, (group_name, ds) in enumerate(header_groups.items()):
        ds_new = ds.copy()
        for var in ds.data_vars:
            if var in all_vars:
                new_name = f"{group_name.split('/')[-1]}_{var}"
                ds_new = ds_new.rename({var: new_name})
            else:
                all_vars[var] = True
        header_datasets.append(ds_new)

    ds = xr.merge(header_datasets)

    # Convert timestamps to numpy datetime
    for var in [
        "Creation_Date",
        "Validity_Start",
        "Validity_Stop",
        "ANXTime",
        "frameStartTime",
        "frameStopTime",
        "processingStartTime",
        "processingStopTime",
        "sensingStartTime",
        "sensingStopTime",
        "stateVectorTime",
    ]:
        if var in ds:
            raw = ds[var].values
            formatted = np.char.replace(raw, "UTC=", "")
            ds[var].values = formatted.astype("datetime64[ns]")

    # Ensure that strings are correctly decoded
    for var in ["frameID"]:
        if var in ds:
            ds = convert_scalar_var_to_str(ds, var)

    # Remove dimensions of size == 1
    ds = ds.squeeze()

    return ds

read_nc

read_nc(input: str | Dataset, modify: bool = True, in_memory: bool = False, **kwargs) -> Dataset

Returns an xarray.Dataset from a Dataset or NetCDF file path, optionally loaded into memory.

Parameters:

Name Type Description Default
input Dataset or str

Path to a NetCDF file. If a already opened xarray.Dataset object is passed, it is returned as is.

required
modify bool

If True, default modifications to the opened dataset will be applied (e.g., converting heights in Polly data from height a.g.l. to height above mean sea level).

True
in_memory bool

If True, ensures the dataset is fully loaded into memory. Defaults to False.

False
**kwargs

Key-word arguments passed to xarray.open_dataset().

{}

Returns:

Type Description
Dataset

xarray.Dataset: The resulting dataset.

Raises:

Type Description
TypeError

If input is not a Dataset or string.

Source code in earthcarekit/read/netcdf.py
def read_nc(
    input: str | xr.Dataset,
    modify: bool = True,
    in_memory: bool = False,
    **kwargs,
) -> xr.Dataset:
    """Returns an `xarray.Dataset` from a Dataset or NetCDF file path, optionally loaded into memory.

    Args:
        input (xarray.Dataset or str): Path to a NetCDF file. If a already opened `xarray.Dataset` object is passed, it is returned as is.
        modify (bool): If True, default modifications to the opened dataset will be applied
            (e.g., converting heights in Polly data from height a.g.l. to height above mean sea level).
        in_memory (bool, optional): If True, ensures the dataset is fully loaded into memory. Defaults to False.
        **kwargs: Key-word arguments passed to `xarray.open_dataset()`.

    Returns:
        xarray.Dataset: The resulting dataset.

    Raises:
        TypeError: If input is not a Dataset or string.
    """
    ds: xr.Dataset
    if isinstance(input, xr.Dataset):
        ds = input
    elif isinstance(input, str):
        if in_memory:
            with _read_nc(input, modify=modify, **kwargs) as ds:
                ds = ds.load()
        else:
            ds = _read_nc(input, modify=modify, **kwargs)
    else:
        raise TypeError(
            "Invalid input type! Expecting a opened NetCDF dataset (xarray.Dataset) or a path to a NetCDF file."
        )
    return ds

read_polly

read_polly(input: str | Dataset) -> Dataset

Reads manually processed PollyXT output text files as xarray.Dataset or returns an already open one.

Source code in earthcarekit/read/pollynet.py
def read_polly(input: str | xr.Dataset) -> xr.Dataset:
    """Reads manually processed PollyXT output text files as `xarray.Dataset` or returns an already open one."""

    if isinstance(input, xr.Dataset):
        return input

    with open(input, "r", encoding="utf-8", errors="ignore") as f:
        df = pd.read_csv(f, sep="\t")

    new_columns = [_parse_column_name(c) for c in df.columns]
    new_column_names = [c.name for c in new_columns]
    new_column_names = _make_column_names_unique(new_column_names)
    df.columns = pd.Index(new_column_names)

    ds = xr.Dataset.from_dataframe(df)
    ds = ds.assign_coords(index=ds.height.values)
    ds = ds.rename({"index": "vertical"})
    if "time" not in ds:
        ds = ds.assign({"time": np.datetime64("1970-01-01T00:00:00.000", "ms")})

    vars_order = ["time"] + [v for v in ds.data_vars if v != "time"]
    ds = ds[vars_order]

    for c in new_columns:
        if c.units == "km":
            ds[c.name].values = ds[c.name].values * 1e3
            c.units = c.units.replace("k", "")
        elif c.units in ["Mm-1 sr-1", "Mm-1", "Msr-1"]:
            ds[c.name].values = ds[c.name].values / 1e6
            c.units = c.units.replace("M", "")

        ds[c.name] = ds[c.name].assign_attrs(
            dict(
                long_name=c.long_name,
                units=c.units,
            )
        )
    return ds

read_product

read_product(
    input: str | Dataset,
    trim_to_frame: bool = True,
    modify: bool = DEFAULT_READ_EC_PRODUCT_MODIFY,
    header: bool = DEFAULT_READ_EC_PRODUCT_HEADER,
    meta: bool = DEFAULT_READ_EC_PRODUCT_META,
    ensure_nans: bool = DEFAULT_READ_EC_PRODUCT_ENSURE_NANS,
    in_memory: bool = False,
    to_geoid: bool = False,
    origin: Literal["native", "derived"] | None = None,
    try_lazy: bool = True,
    **kwargs
) -> Dataset

Returns an xarray.Dataset from a Dataset or EarthCARE file path, optionally loaded into memory.

Parameters:

Name Type Description Default
input str or Dataset

Path to a EarthCARE file. If a xarray.Dataset is given it will be returned as is.

required
trim_to_frame bool

Whether to trim the dataset to latitude frame bounds. Defaults to True.

True
modify bool

If True, default modifications to the opened dataset will be applied (e.g., renaming dimension corresponding to height to "vertical"). Defaults to True.

DEFAULT_READ_EC_PRODUCT_MODIFY
header bool

If True, all header data will be included in the dataframe. Defaults to False.

DEFAULT_READ_EC_PRODUCT_HEADER
meta bool

If True, select meta data from header (like orbit number and frame ID) will be included in the dataframe. Defaults to True.

DEFAULT_READ_EC_PRODUCT_META
ensure_nans bool

If True, ensures that _FillValues are set to NaNs even if encoding of _FillValues or dtype is missing. Be aware, if True increases reading time. Defaults to True.

DEFAULT_READ_EC_PRODUCT_ENSURE_NANS
in_memory bool

If True, ensures the dataset is fully loaded into memory. Defaults to False.

False
to_geoid bool

If True, converts variables representing height/altitude values from HAE (WGS84) to AMSL (EGM96) using the geoid_offset variable. Defaults to False.

False
origin Literal['native', 'derived'] | None

Product origin identifier.

  • "native": file is an original EarthCARE product.
  • "derived": file was generated from a native product through post-processing or transformation (e.g., nadir cross-sections of AUX_MET_1C).
  • None: automatically detect the origin from the filename schema.

Defaults to None.

None
try_lazy bool

If True, first attemps to read using LazyDataset, which is typically the fastest option and supports streaming data access via MAAP. On failure, falls back to "legacy" xarray reader (i.e., slower and no data streaming support). Defaults to True.

True

Returns:

Type Description
Dataset

xarray.Dataset: The resulting dataset.

Raises:

Type Description
TypeError

If input is not a Dataset or string.

Source code in earthcarekit/read/product/_generic.py
def read_product(
    input: str | Dataset,
    trim_to_frame: bool = True,
    modify: bool = DEFAULT_READ_EC_PRODUCT_MODIFY,
    header: bool = DEFAULT_READ_EC_PRODUCT_HEADER,
    meta: bool = DEFAULT_READ_EC_PRODUCT_META,
    ensure_nans: bool = DEFAULT_READ_EC_PRODUCT_ENSURE_NANS,
    in_memory: bool = False,
    to_geoid: bool = False,
    origin: Literal["native", "derived"] | None = None,
    try_lazy: bool = True,
    **kwargs,
) -> Dataset:
    """Returns an `xarray.Dataset` from a Dataset or EarthCARE file path,
    optionally loaded into memory.

    Args:
        input (str or xarray.Dataset):
            Path to a EarthCARE file. If a `xarray.Dataset` is given it will be returned as is.
        trim_to_frame (bool, optional):
            Whether to trim the dataset to latitude frame bounds. Defaults to True.
        modify (bool, optional):
            If True, default modifications to the opened dataset will be applied
            (e.g., renaming dimension corresponding to height to "vertical"). Defaults to True.
        header (bool, optional):
            If True, all header data will be included in the dataframe. Defaults to False.
        meta (bool, optional):
            If True, select meta data from header (like orbit number and frame ID) will be included
            in the dataframe. Defaults to True.
        ensure_nans (bool, optional):
            If True, ensures that _FillValues are set to NaNs even  if encoding of _FillValues or
            dtype is missing. Be aware, if True increases reading time. Defaults to True.
        in_memory (bool, optional):
            If True, ensures the dataset is fully loaded into memory. Defaults to False.
        to_geoid (bool, optional):
            If True, converts variables representing height/altitude values from HAE (WGS84) to
            AMSL (EGM96) using the `geoid_offset` variable. Defaults to False.
        origin (Literal["native", "derived"] | None, optional):
            Product origin identifier.

            - `"native"`: file is an original EarthCARE product.
            - `"derived"`: file was generated from a native product through post-processing or \
                transformation (e.g., nadir cross-sections of `AUX_MET_1C`).
            - None: automatically detect the origin from the filename schema.

            Defaults to None.
        try_lazy (bool, optional):
            If True, first attemps to read using `LazyDataset`, which is typically the fastest
            option and supports streaming data access via MAAP. On failure, falls back to "legacy"
            `xarray` reader (i.e., slower and no data streaming support). Defaults to True.

    Returns:
        xarray.Dataset: The resulting dataset.

    Raises:
        TypeError: If input is not a Dataset or string.
    """
    ds: Dataset
    if isinstance(input, Dataset):
        ds = input
    elif isinstance(input, str):
        if try_lazy:
            try:
                file_type = get_file_info_from_str(input)["file_type"]
                is_supported = file_type in LazyDataset.get_supported_file_types()
            except ValueError:
                is_supported = False

            if (
                is_supported
                and modify is True
                and header is False
                and meta is True
                and ensure_nans is True
            ):
                return LazyDataset(
                    input,
                    in_memory=True,
                    trim_to_frame=trim_to_frame,
                    to_geoid=to_geoid,
                    origin=origin,
                ).to_xarray()

            if not is_supported:
                logging.getLogger().info(
                    "`LazyDataset` reader don't support file_type; fall back to `xarray`-based reader"
                )

        kwargs = dict(
            trim_to_frame=trim_to_frame,
            modify=modify,
            header=header,
            meta=meta,
            ensure_nans=ensure_nans,
            **kwargs,
        )
        if in_memory:
            with _read_product(filepath=input, **kwargs) as ds:
                ds = ds.load()
        else:
            ds = _read_product(filepath=input, **kwargs)
    else:
        raise TypeError(
            "Invalid input type! Expecting a opened EarthCARE dataset (xarray.Dataset) or a path to a EarthCARE product."
        )
    return ds

read_products

read_products(
    filepaths: Sequence[str] | NDArray[str_] | DataFrame,
    zoom_at: float | None = None,
    along_track_dim: str = ALONG_TRACK_DIM,
    func: Callable | None = None,
    func_inputs: Sequence[dict] | None = None,
    max_num_files: int = 8,
    coarsen: bool = True,
) -> Dataset

Read and concatenate a sequence of EarthCARE frames into a single xarray Dataset.

By default, the dataset is coarsened according to the number of input frames (e.g., combining 3 products averages every 3 profiles, so the along-track dimension remains comparable to a single product). Optionally applies a processing function to each frame and zooms in on a specific region (defined by zoom_at) without coarsening. Coarsening can also be turned of but might case memory issues.

Parameters:

Name Type Description Default
filepaths Sequence[str] or DataFrame

EarthCARE product file paths as a list or a DataFrame with metadata including filepath, orbit_number, and frame_id.

required
zoom_at float

If set, selects only a zoomed-in portion of the frames around this fractional index. Defaults to None.

None
along_track_dim str

Name of the dimension to concatenate along. Defaults to ALONG_TRACK_DIM.

ALONG_TRACK_DIM
func Callable

Function to apply to each frame after loading. Defaults to None.

None
func_inputs Sequence[dict]

Optional per-frame arguments to pass to func. Defaults to None.

None
max_num_files int

Max. number of files that are allowed to be loaded at once. A ValueError is raised if above. Defaults to 8 (e.g., full orbit).

8
coarsen bool

If Ture, read data sets are coarened depending on the number given of files. Only aplicable when not zooming. Defaults to Ture.

True

Returns:

Name Type Description
Dataset Dataset

Concatenated dataset with all frames along along_track_dim.

Source code in earthcarekit/read/product/_concat.py
def read_products(
    filepaths: Sequence[str] | NDArray[np.str_] | pd.DataFrame,
    zoom_at: float | None = None,
    along_track_dim: str = ALONG_TRACK_DIM,
    func: Callable | None = None,
    func_inputs: Sequence[dict] | None = None,
    max_num_files: int = 8,
    coarsen: bool = True,
) -> Dataset:
    """Read and concatenate a sequence of EarthCARE frames into a single xarray Dataset.

    By default, the dataset is coarsened according to the number of input frames (e.g.,
    combining 3 products averages every 3 profiles, so the along-track dimension remains
    comparable to a single product). Optionally applies a processing function to each
    frame and zooms in on a specific region (defined by `zoom_at`) without coarsening.
    Coarsening can also be turned of but might case memory issues.

    Args:
        filepaths (Sequence[str] or pandas.DataFrame):
            EarthCARE product file paths as a list or a DataFrame with metadata
            including `filepath`, `orbit_number`, and `frame_id`.
        zoom_at (float, optional):
            If set, selects only a zoomed-in portion of the frames around this
            fractional index. Defaults to None.
        along_track_dim (str, optional):
            Name of the dimension to concatenate along. Defaults to ALONG_TRACK_DIM.
        func (Callable, optional):
            Function to apply to each frame after loading. Defaults to None.
        func_inputs (Sequence[dict], optional):
            Optional per-frame arguments to pass to `func`. Defaults to None.
        max_num_files (int, optional):
            Max. number of files that are allowed to be loaded at once.
            A `ValueError` is raised if above. Defaults to 8 (e.g., full orbit).
        coarsen (bool, optional):
            If Ture, read data sets are coarened depending on the number given of files.
            Only aplicable when not zooming. Defaults to Ture.

    Returns:
        Dataset: Concatenated dataset with all frames along `along_track_dim`.
    """
    if isinstance(filepaths, str):
        filepaths = [filepaths]
    elif isinstance(filepaths, pd.DataFrame):
        df = filepaths.sort_values(by="orbit_and_frame")
        filepaths = df["filepath"].tolist()
    else:
        df = ProductDataFrame.from_files(list(filepaths)).sort_values(by="orbit_and_frame")
        df.validate_columns()
        filepaths = df["filepath"].tolist()

    if len(filepaths) == 0:
        raise ValueError("Given sequence of product files paths is empty")
    elif len(filepaths) == 1:
        warnings.warn("Can not concatenate frames since only one file path was given")
        return read_product(filepaths[0])
    elif len(filepaths) > max_num_files:
        raise ValueError(
            f"Too many files provided: {len(filepaths)} (currently maximum allowed is {max_num_files}). "
            "Please reduce the number of files or increase the allowed amount by setting the argument max_num_files."
        )
    elif len(filepaths) > 8:
        warnings.warn(
            f"You provided {len(filepaths)} files, which is more than one full orbit (8 files). "
            "Processing might take longer than usual."
        )

    # # Construct filename suffix from orbit/frame numbers
    # orbit_start = str(df["orbit_number"].iloc[0]).zfill(5)
    # orbit_end = str(df["orbit_number"].iloc[-1]).zfill(5)
    # frame_start = df["frame_id"].iloc[0]
    # frame_end = df["frame_id"].iloc[-1]

    # if orbit_start == orbit_end:
    #     oaf_string = (
    #         f"{orbit_start}{frame_start}"
    #         if frame_start == frame_end
    #         else f"{orbit_start}{frame_start}-{frame_end}"
    #     )
    # else:
    #     oaf_string = f"{orbit_start}{frame_start}-{orbit_end}{frame_end}"

    def apply_func(ds: Dataset, i: int) -> Dataset:
        """Apply a processing function to a dataset if specified."""
        if func is None:
            return ds
        if func_inputs is None:
            return func(ds)
        if i < len(func_inputs):
            return func(ds, **func_inputs[i])
        raise IndexError("Too few function inputs provided")

    num_files = len(filepaths)
    ds: xr.Dataset | None = None

    if zoom_at is not None:
        # Zoomed read: select portions of two adjacent frames
        frame_indices = np.unique([int(np.floor(zoom_at)), int(np.ceil(zoom_at))])
        offset = zoom_at - frame_indices[0]
        filepaths = [filepaths[i] for i in frame_indices]

        for i, filepath in enumerate(filepaths):
            with read_product(filepath) as frame_ds:
                frame_ds = apply_func(frame_ds, frame_indices[i])

                # Preserve original dtypes
                original_dtypes = {v: frame_ds[v].dtype for v in frame_ds.variables}

                # Select relevant portion of the frame
                n = len(frame_ds[along_track_dim])
                sel_slice = (
                    slice(int(np.floor(n * offset)), n)
                    if i == 0
                    else slice(0, int(np.ceil(n * offset)))
                )
                frame_ds = frame_ds.sel({along_track_dim: sel_slice})

                # Restore dtypes
                for v, dtype in original_dtypes.items():
                    frame_ds[v] = frame_ds[v].astype(dtype)

                ds = (
                    frame_ds.copy()
                    if ds is None
                    else concat_datasets(ds.copy(), frame_ds.copy(), dim=along_track_dim)
                )

    else:
        # Full read and coarsen each frame
        for i, filepath in enumerate(filepaths):
            with read_product(filepath) as frame_ds:
                frame_ds = apply_func(frame_ds, i)

                if coarsen:
                    original_dtypes = {v: frame_ds[v].dtype for v in frame_ds.variables}

                    coarsen_dims = {along_track_dim: num_files}

                    # Circular mean for longitude
                    lon_coarse = (
                        frame_ds["longitude"]
                        .coarsen(coarsen_dims, boundary="trim")
                        .reduce(circular_mean_np)
                    )
                    _tmp_attrs = lon_coarse.attrs
                    lon_coarse.attrs = {}

                    # Regular mean for the rest
                    rest = (
                        frame_ds.drop_vars("longitude")
                        .coarsen(coarsen_dims, boundary="trim")
                        .mean()  # type: ignore
                    )

                    # Merge results
                    frame_ds = xr.merge([lon_coarse, rest])
                    frame_ds["longitude"].attrs = _tmp_attrs

                    for v, dtype in original_dtypes.items():
                        frame_ds[v] = frame_ds[v].astype(dtype)

                ds = frame_ds if ds is None else concat_datasets(ds, frame_ds, dim=along_track_dim)

    # Set output file sources
    if isinstance(ds, Dataset):
        ds.encoding["sources"] = list(filepaths)
        return ds
    else:
        raise RuntimeError("Bad implementation")

read_science_data

read_science_data(
    filepath: str, agency: Union[FileAgency, None] = None, ensure_nans: bool = False, **kwargs
) -> Dataset

Opens the science data of a EarthCARE file as a xarray.Dataset.

Source code in earthcarekit/read/science.py
def read_science_data(
    filepath: str,
    agency: Union["FileAgency", None] = None,
    ensure_nans: bool = False,
    **kwargs,
) -> xr.Dataset:
    """Opens the science data of a EarthCARE file as a `xarray.Dataset`."""
    if agency is None:
        agency = FileAgency.from_input(filepath)

    if agency == FileAgency.ESA:
        ds = xr.open_dataset(filepath, group="ScienceData", engine=_engine, **kwargs)
    elif agency == FileAgency.JAXA:
        df_cpr_geo = xr.open_dataset(
            filepath,
            group="ScienceData/Geo",
            engine=_engine,
            phony_dims="sort",
            **kwargs,
        )
        df_cpr_data = xr.open_dataset(
            filepath,
            group="ScienceData/Data",
            engine=_engine,
            phony_dims="sort",
            **kwargs,
        )
        ds = xr.merge([df_cpr_data, df_cpr_geo])
        ds.encoding["source"] = df_cpr_data.encoding["source"]
    else:
        raise NotImplementedError()

    if ensure_nans:
        ds = _convert_all_fill_values_to_nan(ds)

    return ds

rebin_msi_to_jsg

rebin_msi_to_jsg(
    ds_msi: Dataset | str,
    ds_xjsg: Dataset | str,
    vars: list[str] | None = None,
    k: int = 4,
    eps: float = 1e-12,
    lat_var: str = SWATH_LAT_VAR,
    lon_var: str = SWATH_LON_VAR,
    time_var: str = TIME_VAR,
    along_track_dim: str = ALONG_TRACK_DIM,
    across_track_dim: str = ACROSS_TRACK_DIM,
    lat_var_xjsg: str = SWATH_LAT_VAR,
    lon_var_xjsg: str = SWATH_LON_VAR,
    time_var_xjsg: str = TIME_VAR,
    along_track_dim_xjsg: str = ALONG_TRACK_DIM,
    across_track_dim_xjsg: str = ACROSS_TRACK_DIM,
) -> Dataset

Rebins variables from an MSI product dataset onto the geo-spacial lat/lon grid given by the related AUX_JSG_1D dataset.

This function interpolates selected variables from ds_msi onto the JSG grid from ds_xjsg using quick kd-tree nearest-neighbor search with scipy.spatial.cKDTree followed by averaging the k-nearest points using inverse distance weighting. The resulting dataframe matches the along- and across-track resolution of ds_xjsg.

Parameters:

Name Type Description Default
ds_msi Dataset | str

The source MSI dataset (e.g., MSI_RGR_1C, MSI_COP_2A, ...).

required
ds_xjsg Dataset | str

The target XJSG dataset.

required
vars list[str] | None

List of variable names from ds_msi to rebin. If None, all data variables are considered. Defaults to None.

None
k int

Number of nearest geo-spacial neighbors to include in the kd-tree search. Defaults to 4.

4
eps float

Numerical threshold to avoid division by zero in distance calculations during the kd-tree search. Defaults to 1e-12.

1e-12

Returns:

Type Description
Dataset

xr.Dataset: The MSI dataset with variables rebinned to the JSG grid.

Source code in earthcarekit/read/product/_rebin_msi_to_jsg.py
def rebin_msi_to_jsg(
    ds_msi: xr.Dataset | str,
    ds_xjsg: xr.Dataset | str,
    vars: list[str] | None = None,
    k: int = 4,
    eps: float = 1e-12,
    lat_var: str = SWATH_LAT_VAR,
    lon_var: str = SWATH_LON_VAR,
    time_var: str = TIME_VAR,
    along_track_dim: str = ALONG_TRACK_DIM,
    across_track_dim: str = ACROSS_TRACK_DIM,
    lat_var_xjsg: str = SWATH_LAT_VAR,
    lon_var_xjsg: str = SWATH_LON_VAR,
    time_var_xjsg: str = TIME_VAR,
    along_track_dim_xjsg: str = ALONG_TRACK_DIM,
    across_track_dim_xjsg: str = ACROSS_TRACK_DIM,
) -> xr.Dataset:
    """
    Rebins variables from an MSI product dataset onto the geo-spacial lat/lon grid given by the related AUX_JSG_1D dataset.

    This function interpolates selected variables from `ds_msi` onto the JSG grid from `ds_xjsg`
    using quick kd-tree nearest-neighbor search with `scipy.spatial.cKDTree` followed
    by averaging the `k`-nearest points using inverse distance weighting. The resulting dataframe
    matches the along- and across-track resolution of `ds_xjsg`.

    Args:
        ds_msi (xr.Dataset | str): The source MSI dataset (e.g., MSI_RGR_1C, MSI_COP_2A, ...).
        ds_xjsg (xr.Dataset | str): The target XJSG dataset.
        vars (list[str] | None, optional): List of variable names from `ds_msi` to rebin.
            If None, all data variables are considered. Defaults to None.
        k (int, optional): Number of nearest geo-spacial neighbors to include in the kd-tree search.
            Defaults to 4.
        eps (float, optional): Numerical threshold to avoid division by zero in distance calculations during the kd-tree search.
            Defaults to 1e-12.

    Returns:
        xr.Dataset: The MSI dataset with variables rebinned to the JSG grid.
    """

    def _read_msi() -> xr.Dataset:
        if isinstance(ds_msi, str):
            return read_product(ds_msi)
        return ds_msi

    def _read_xjsg() -> xr.Dataset:
        if isinstance(ds_xjsg, str):
            return read_product(ds_xjsg)
        return ds_xjsg

    with (
        _read_msi() as ds_msi,
        _read_xjsg() as ds_xjsg,
    ):
        if vars is None:
            vars = [str(v) for v in ds_msi.variables]
        else:
            for var in vars:
                if var not in ds_msi.variables:
                    present_vars = [str(v) for v in ds_msi.variables]
                    raise KeyError(
                        f"""X-MET dataset does not contain variable "{var}". Present variables are: {", ".join(present_vars)}"""
                    )

        ds_xjsg = ds_xjsg.copy().swap_dims(
            {
                along_track_dim_xjsg: along_track_dim,
                across_track_dim_xjsg: across_track_dim,
            }
        )

        new_ds_msi = ds_msi.copy().swap_dims(
            {
                along_track_dim: f"{along_track_dim}_original",
                across_track_dim: f"{across_track_dim}_original",
            }
        )
        new_ds_msi[time_var] = ds_xjsg[time_var_xjsg].copy()

        lat_msi = ds_msi[lat_var].values.flatten()
        lon_msi = ds_msi[lon_var].values.flatten()
        coords_msi = sequence_geo_to_ecef(lat_msi, lon_msi)

        lat_jsg = ds_xjsg[lat_var_xjsg].values.flatten()
        lon_jsg = ds_xjsg[lon_var_xjsg].values.flatten()
        coords_jsg = sequence_geo_to_ecef(lat_jsg, lon_jsg)

        tree = cKDTree(coords_msi)
        dists, idxs = tree.query(coords_jsg, k=k)

        dims: str | tuple[str, str]
        for var in vars:
            if ds_msi[var].dims == (along_track_dim, across_track_dim):
                dims = (along_track_dim, across_track_dim)

                values = ds_msi[var].values
                values_flat = values.flatten()

                mask_nan = np.isnan(values_flat[idxs])

                _dists = dists
                _dists[mask_nan] = np.inf

                # Inverse distance weighting
                if k > 1:
                    weights = 1.0 / (_dists + eps)
                    weights /= np.sum(weights, axis=1, keepdims=True)
                else:
                    weights = np.ones(idxs.shape)

                if k > 1:
                    _v = values_flat[idxs]

                    if np.issubdtype(_v.dtype, np.floating):
                        m = np.all(np.isnan(_v), axis=1)
                        _v[np.isnan(_v)] = 0.0
                        _v[m] = np.nan

                    result = np.sum(_v * weights, axis=1)

                    new_values = result
                else:
                    new_values = values_flat[idxs]

                new_values = new_values.reshape(ds_xjsg.latitude_swath.shape)

                new_var = f"{var}"
                new_ds_msi[new_var] = (dims, new_values)
                new_ds_msi[new_var].attrs = ds_msi[var].attrs
            elif var not in _SKIP_VARS and var in ds_msi and var in ds_xjsg:
                new_ds_msi[var] = ds_xjsg[var].copy()
                new_ds_msi[var].attrs = ds_xjsg[var].attrs
            else:
                continue

        return new_ds_msi

rebin_xmet_to_vertical_track

rebin_xmet_to_vertical_track(
    ds_xmet: Dataset | str,
    ds_vert: Dataset | str,
    vars: list[str] | None = None,
    k: int = 4,
    eps: float = 1e-12,
    lat_var: str = TRACK_LAT_VAR,
    lon_var: str = TRACK_LON_VAR,
    time_var: str = TIME_VAR,
    height_var: str = HEIGHT_VAR,
    along_track_dim: str = ALONG_TRACK_DIM,
    height_dim: str = VERTICAL_DIM,
    xmet_lat_var: str = "latitude",
    xmet_lon_var: str = "longitude",
    xmet_height_var: str = "geometrical_height",
    xmet_height_dim: str = "height",
    xmet_horizontal_grid_dim: str = "horizontal_grid",
) -> Dataset

Rebins variables from an AUX_MET_1D (XMET) dataset onto the vertical curtain track of given by another dataset (e.g. ATL_EBD_2A).

This function interpolates selected variables from ds_xmet onto a EarthCARE vertical track given in ds_vert, using quick horizontal kd-tree nearest-neighbor search with scipy.spatial.cKDTree followed by averaging the k-nearest vertical XMET profiles using inverse distance weighting. The resulting profiles are then interpolated in the vertical to match the height resolution of ds_vert.

Parameters:

Name Type Description Default
ds_xmet Dataset | str

The source XMET dataset from which vertical curtain along track will be interpolated.

required
ds_vert Dataset | str

The target dataset containing the vertical curtain track.

required
vars list[str] | None

List of variable names from ds_xmet to rebin. If None, all data variables are considered.

None
k int

Number of nearest horizontal neighbors to include in the kd-tree search. Defaults to 4.

4
eps float

Numerical threshold to avoid division by zero in distance calculations during the kd-tree search. Defaults to 1e-12.

1e-12
lat_var str

Name of the latitude variable in ds_vert. Defaults to TRACK_LAT_VAR.

TRACK_LAT_VAR
lon_var str

Name of the longitude variable in ds_vert. Defaults to TRACK_LON_VAR.

TRACK_LON_VAR
time_var str

Name of the time variable in ds_vert. Defaults to TIME_VAR.

TIME_VAR
height_var str

Name of the height variable in ds_vert. Defaults to HEIGHT_VAR.

HEIGHT_VAR
along_track_dim str

Name of the along-track dimension in ds_vert. Defaults to ALONG_TRACK_DIM.

ALONG_TRACK_DIM
height_dim str

Name of the vertical or height dimension in ds_vert. Defaults to VERTICAL_DIM.

VERTICAL_DIM
xmet_lat_var str

Name of the latitude variable in ds_xmet. Defaults to "latitude".

'latitude'
xmet_lon_var str

Name of the longitude variable in ds_xmet. Defaults to "longitude".

'longitude'
xmet_height_var str

Name of the height variable in ds_xmet. Defaults to "geometrical_height".

'geometrical_height'
xmet_height_dim str

Name of the vertical dimension in ds_xmet. Defaults to "height".

'height'
xmet_horizontal_grid_dim str

Name of the horizontal grid dimension in ds_xmet. Defaults to "horizontal_grid".

'horizontal_grid'

Returns:

Type Description
Dataset

xr.Dataset: A new dataset containing the selected XMET variables interpolated to the grid of the vertical curtain given in ds_vert. This new dataset has the same along-track and vertical dimensions as ds_vert.

Raises:

Type Description
KeyError

If any specified variable or coordinate name is not found in ds_xmet.

Source code in earthcarekit/read/product/_rebin_xmet_to_vertical_track.py
def rebin_xmet_to_vertical_track(
    ds_xmet: xr.Dataset | str,
    ds_vert: xr.Dataset | str,
    vars: list[str] | None = None,
    k: int = 4,
    eps: float = 1e-12,
    lat_var: str = TRACK_LAT_VAR,
    lon_var: str = TRACK_LON_VAR,
    time_var: str = TIME_VAR,
    height_var: str = HEIGHT_VAR,
    along_track_dim: str = ALONG_TRACK_DIM,
    height_dim: str = VERTICAL_DIM,
    xmet_lat_var: str = "latitude",
    xmet_lon_var: str = "longitude",
    xmet_height_var: str = "geometrical_height",
    xmet_height_dim: str = "height",
    xmet_horizontal_grid_dim: str = "horizontal_grid",
) -> xr.Dataset:
    """
    Rebins variables from an AUX_MET_1D (XMET) dataset onto the vertical curtain track of given by another dataset (e.g. ATL_EBD_2A).

    This function interpolates selected variables from `ds_xmet` onto a EarthCARE
    vertical track given in `ds_vert`, using quick horizontal kd-tree nearest-neighbor search with `scipy.spatial.cKDTree` followed
    by averaging the `k`-nearest vertical XMET profiles using inverse distance weighting. The resulting
    profiles are then interpolated in the vertical to match the height resolution of `ds_vert`.

    Args:
        ds_xmet (xr.Dataset | str): The source XMET dataset from which vertical curtain along track will be interpolated.
        ds_vert (xr.Dataset | str): The target dataset containing the vertical curtain track.
        vars (list[str] | None, optional): List of variable names from `ds_xmet` to rebin.
            If None, all data variables are considered.
        k (int, optional): Number of nearest horizontal neighbors to include in the kd-tree search.
            Defaults to 4.
        eps (float, optional): Numerical threshold to avoid division by zero in distance calculations during the kd-tree search.
            Defaults to 1e-12.
        lat_var (str, optional): Name of the latitude variable in `ds_vert`.
            Defaults to TRACK_LAT_VAR.
        lon_var (str, optional): Name of the longitude variable in `ds_vert`.
            Defaults to TRACK_LON_VAR.
        time_var (str, optional): Name of the time variable in `ds_vert`.
            Defaults to TIME_VAR.
        height_var (str, optional): Name of the height variable in `ds_vert`.
            Defaults to HEIGHT_VAR.
        along_track_dim (str, optional): Name of the along-track dimension in `ds_vert`.
            Defaults to ALONG_TRACK_DIM.
        height_dim (str, optional): Name of the vertical or height dimension in `ds_vert`.
            Defaults to VERTICAL_DIM.
        xmet_lat_var (str, optional): Name of the latitude variable in `ds_xmet`.
            Defaults to "latitude".
        xmet_lon_var (str, optional): Name of the longitude variable in `ds_xmet`.
            Defaults to "longitude".
        xmet_height_var (str, optional): Name of the height variable in `ds_xmet`.
            Defaults to "geometrical_height".
        xmet_height_dim (str, optional): Name of the vertical dimension in `ds_xmet`.
            Defaults to "height".
        xmet_horizontal_grid_dim (str, optional): Name of the horizontal grid dimension in `ds_xmet`.
            Defaults to "horizontal_grid".

    Returns:
        xr.Dataset: A new dataset containing the selected XMET variables interpolated to the grid of the
            vertical curtain given in `ds_vert`. This new dataset has the same along-track and vertical
            dimensions as `ds_vert`.

    Raises:
        KeyError: If any specified variable or coordinate name is not found in `ds_xmet`.
    """
    # Return given dataset, if nadir cross-section has already been extracted from it.
    if (
        isinstance(ds_xmet, xr.Dataset)
        and along_track_dim in ds_xmet.sizes
        and height_dim in ds_xmet.sizes
    ):
        return ds_xmet

    def _read_xmet() -> xr.Dataset:
        if isinstance(ds_xmet, str):
            return read_product(ds_xmet)
        return ds_xmet

    def _read_vert() -> xr.Dataset:
        if isinstance(ds_vert, str):
            return read_product(ds_vert)
        return ds_vert

    with (
        _read_xmet() as ds_xmet,
        _read_vert() as ds_vert,
    ):
        if vars is None:
            vars = [str(v) for v in ds_xmet.variables]
        else:
            for var in vars:
                if var not in ds_xmet.variables:
                    present_vars = [str(v) for v in ds_xmet.variables]
                    raise KeyError(
                        f"""X-MET dataset does not contain variable "{var}". Present variables are: {", ".join(present_vars)}"""
                    )

        if xmet_height_dim in ds_xmet.sizes:
            new_ds_xmet = ds_xmet.copy().swap_dims({xmet_height_dim: "tmp_xmet_height"})
        elif VERTICAL_DIM in ds_xmet.sizes:
            new_ds_xmet = ds_xmet.copy().swap_dims({VERTICAL_DIM: "tmp_xmet_height"})
        else:
            raise ValueError(
                f"no dimension named '{xmet_height_dim}' or '{VERTICAL_DIM}' in X-MET dataset ({ds_xmet.sizes})"
            )
        new_ds_xmet[time_var] = ds_vert[time_var].copy()
        new_ds_xmet[height_var] = ds_vert[height_var].copy()

        hgrid_lat = ds_xmet[xmet_lat_var].values.flatten()
        hgrid_lon = ds_xmet[xmet_lon_var].values.flatten()
        hgrid_alt = ds_xmet[xmet_height_var].values
        hgrid_coords = sequence_geo_to_ecef(hgrid_lat, hgrid_lon)

        track_lat = ds_vert[lat_var].values
        track_lon = ds_vert[lon_var].values
        track_alt = ds_vert[height_var].values
        track_coords = sequence_geo_to_ecef(track_lat, track_lon)

        idxs, weights, height = _grid_along_track(
            hgrid_coords=hgrid_coords,
            target_coords=track_coords,
            hgrid_alt=hgrid_alt,
            k=k,
            eps=eps,
        )

        # Handle longitudes separately to account for sign changes at the dateline
        if xmet_lon_var in vars:
            vars.remove(xmet_lon_var)

        new_coords = _interp_values_along_track_1d(
            kdtree_idxs=idxs,
            kdtree_weights=weights.reshape((*weights.shape, 1)),
            hgrid_values=hgrid_coords,
            k=k,
        )

        new_lons = sequence_ecef_to_geo(
            x=new_coords[:, 0],
            y=new_coords[:, 1],
            z=new_coords[:, 2],
        )[:, 1]

        new_ds_xmet[xmet_lon_var] = xr.DataArray(
            data=new_lons,
            dims=along_track_dim,
            attrs=new_ds_xmet[xmet_lon_var].attrs,
        )

        # Handle all remaining variables
        dims: str | tuple[str, str]
        for var in vars:
            values = ds_xmet[var].values
            if len(values.shape) == 0:
                continue

            if len(values.shape) == 1:
                dims = along_track_dim

                new_values = _interp_values_along_track_1d(
                    kdtree_idxs=idxs,
                    kdtree_weights=weights,
                    hgrid_values=values,
                    k=k,
                )
            else:
                dims = (along_track_dim, height_dim)

                new_values = _interp_values_along_track_2d(
                    kdtree_idxs=idxs,
                    kdtree_weights=weights,
                    target_gridded_alt=height,
                    target_alt=track_alt,
                    hgrid_values=values,
                    k=k,
                )

            new_var = f"{var}"
            new_ds_xmet[new_var] = (dims, new_values)
            new_ds_xmet[new_var].attrs = ds_xmet[var].attrs

        # Remove original horizontal grid dims and associated variables
        new_ds_xmet = remove_dims(new_ds_xmet, [xmet_horizontal_grid_dim, xmet_height_dim])

        return new_ds_xmet

search_files_by_regex

search_files_by_regex(root_dirpath: str, regex_pattern: str) -> list[str]

Recursively searches for files in a directory that match a given regex pattern.

Parameters:

Name Type Description Default
root_dirpath str

The root directory to start the search from.

required
regex_pattern str

A regular expression pattern to match file names against.

required
Return

list[str]: A list of absolute file paths that point to files with matching names.

Raises:

Type Description
FileNotFoundError

If the root directory does not exist.

error

If the given pattern is not a valid regular expression.

Source code in earthcarekit/utils/path.py
def search_files_by_regex(root_dirpath: str, regex_pattern: str) -> list[str]:
    """Recursively searches for files in a directory that match a given regex pattern.

    Args:
        root_dirpath (str): The root directory to start the search from.
        regex_pattern (str): A regular expression pattern to match file names against.

    Return:
        list[str]: A list of absolute file paths that point to files with matching names.

    Raises:
        FileNotFoundError: If the root directory does not exist.
        re.error: If the given pattern is not a valid regular expression.
    """
    if not os.path.exists(root_dirpath):
        raise FileNotFoundError(
            f"{search_files_by_regex.__name__}() Root directory does not exist: {root_dirpath}"
        )

    filepaths = []
    for dirpath, _, filenames in os.walk(root_dirpath):
        for filename in filenames:
            filepath = os.path.join(dirpath, filename)
            if re.search(regex_pattern, filename):
                filepaths.append(filepath)
    return filepaths

search_product

search_product(
    root_dirpath: str | None = None,
    config: str | ECKConfig | None = None,
    file_type: str | Sequence[str] | None = None,
    agency: str | Sequence[str] | None = None,
    latency: str | Sequence[str] | None = None,
    timestamp: TimestampLike | Sequence[TimestampLike] | None = None,
    baseline: str | Sequence[str] | None = None,
    orbit_and_frame: str | Sequence[str] | None = None,
    orbit_number: int | str | Sequence[int | str] | None = None,
    frame_id: str | Sequence[str] | None = None,
    filename: str | Sequence[str] | None = None,
    start_time: TimestampLike | None = None,
    end_time: TimestampLike | None = None,
    mode: Literal["exhaustive", "fast"] = "exhaustive",
    read_geo_from_hdr: bool = False,
) -> ProductDataFrame

Searches for EarthCARE product files matching given metadata filters.

Parameters:

Name Type Description Default
root_dirpath str

Root directory to search. Defaults to directory given in a configuration file.

None
config str | ECKConfig | None

Path to a config.toml file or a ECKConfig instance. Defaults to the default configuration file path.

None
file_type str | Sequence[str]

Product file type(s) to match.

None
agency str | Sequence[str]

Producing agency or agencies (e.g. "ESA" or "JAXA").

None
latency str | Sequence[str]

Data latency level(s).

None
timestamp TimestampLike | Sequence

Timestamp(s) included in the product's time coverage.

None
baseline str | Sequence[str]

Baseline version(s).

None
orbit_and_frame str | Sequence[str]

Orbit and frame identifiers.

None
orbit_number int, str, | Sequence

Orbit number(s).

None
frame_id str | Sequence[str]

Frame identifier(s).

None
filename str | Sequence[str]

Specific filename(s) or regular expression patterns to match.

None
start_time TimestampLike

First timestamp included in the product's time coverage.

None
end_time TimestampLike

Last timestamp included in the product's time coverage.

None
mode Literal['exhaustive', 'fast']

Search strategy controlling completeness vs performance; the "exhaustive" mode recursivly scans all files under the root_directory, while the "fast" mode searches files only at expected paths and may miss files outside the standard data folder structure defined during the configuration of earthcarekit.

'exhaustive'
read_geo_from_hdr bool

If True, reads start and end lat/lon from existing header files (.HDR) and fills in respective columns in the resulting table.

False

Returns:

Name Type Description
resutls ProductDataFrame

Filtered table of matching product files as a pandas.DataFrame-based object.

Raises:

Type Description
FileNotFoundError

If root directory does not exist.

Source code in earthcarekit/read/product/_search.py
def search_product(
    root_dirpath: str | None = None,
    config: str | ECKConfig | None = None,
    file_type: str | Sequence[str] | None = None,
    agency: str | Sequence[str] | None = None,
    latency: str | Sequence[str] | None = None,
    timestamp: TimestampLike | Sequence[TimestampLike] | None = None,
    baseline: str | Sequence[str] | None = None,
    orbit_and_frame: str | Sequence[str] | None = None,
    orbit_number: int | str | Sequence[int | str] | None = None,
    frame_id: str | Sequence[str] | None = None,
    filename: str | Sequence[str] | None = None,
    start_time: TimestampLike | None = None,
    end_time: TimestampLike | None = None,
    mode: Literal["exhaustive", "fast"] = "exhaustive",
    read_geo_from_hdr: bool = False,
) -> ProductDataFrame:
    """
    Searches for EarthCARE product files matching given metadata filters.

    Args:
        root_dirpath (str, optional): Root directory to search. Defaults to directory given in a configuration file.
        config (str | ECKConfig | None , optional): Path to a `config.toml` file or a ECKConfig instance. Defaults to the default configuration file path.
        file_type (str | Sequence[str], optional): Product file type(s) to match.
        agency (str | Sequence[str], optional): Producing agency or agencies (e.g. "ESA" or "JAXA").
        latency (str | Sequence[str], optional): Data latency level(s).
        timestamp (TimestampLike | Sequence, optional): Timestamp(s) included in the product's time coverage.
        baseline (str | Sequence[str], optional): Baseline version(s).
        orbit_and_frame (str | Sequence[str], optional): Orbit and frame identifiers.
        orbit_number (int, str, | Sequence, optional): Orbit number(s).
        frame_id (str | Sequence[str], optional): Frame identifier(s).
        filename (str | Sequence[str], optional): Specific filename(s) or regular expression patterns to match.
        start_time (TimestampLike, optional): First timestamp included in the product's time coverage.
        end_time (TimestampLike, optional): Last timestamp included in the product's time coverage.
        mode (Literal["exhaustive", "fast"], optional): Search strategy controlling completeness vs performance; the "exhaustive" mode
            recursivly scans all files under the `root_directory`, while the "fast" mode searches files only at expected paths
            and may miss files outside the standard data folder structure defined during the configuration of earthcarekit.
        read_geo_from_hdr (bool, optional): If True, reads start and end lat/lon from existing header files (`.HDR`) and fills in respective columns in the resulting table.

    Returns:
        resutls (ProductDataFrame): Filtered table of matching product files as a `pandas.DataFrame`-based object.

    Raises:
        FileNotFoundError: If root directory does not exist.
    """
    if not isinstance(config, ECKConfig):
        config = read_config(config)

    if not isinstance(root_dirpath, str):
        root_dirpath = config.path_to_data

    if not os.path.exists(root_dirpath):
        raise FileNotFoundError(f"Given root directory does not exist: {root_dirpath}")

    mission_id = "ECA"

    if isinstance(file_type, str):
        file_type = [file_type]
    if isinstance(file_type, Sequence):
        _baseline: list[str] = []
        _file_type: list[str] = []
        for i, ft in enumerate(file_type):
            if isinstance(ft, str):
                _parts = ft.split(":")
                if len(_parts) == 2:
                    _file_type.append(_parts[0])
                    _baseline.append(_parts[1])
                    continue
            _file_type.append(ft)
            if isinstance(baseline, str):
                _baseline.append(baseline)
            elif isinstance(baseline, Sequence):
                try:
                    _baseline.append(baseline[i])
                except IndexError as e:
                    raise IndexError(e, "given baseline list is too small")
            else:
                _baseline.append("latest")
        file_type = _file_type
        baseline = _baseline
    file_type = _to_file_info_list(file_type, FileType)
    baseline = _format_input(
        baseline,
        file_types=file_type,
        default_input="..",
        format_func=validate_baseline,
    )
    baseline_and_file_type_list = [f"{bl}_{ft}" for bl, ft in zip(baseline, file_type)]
    baseline_and_file_type = _list_to_regex(baseline_and_file_type_list, ".._..._..._..")

    agency = _to_file_info_list(agency, FileAgency)
    agency = _list_to_regex(agency, ".")

    latency = _to_file_info_list(latency, FileLatency)
    latency = _list_to_regex(latency, ".")

    timestamp = _format_input(timestamp, format_func=to_timestamp)
    _start_time = [] if start_time is None else [to_timestamp(start_time)]
    _end_time = [] if end_time is None else [to_timestamp(end_time)]
    timestamp = timestamp + _start_time + _end_time

    orbit_and_frame = _format_input(orbit_and_frame, format_func=format_orbit_and_frame)
    orbit_and_frame = _list_to_regex(orbit_and_frame, "." * 6)

    orbit_number = _format_input(orbit_number, format_func=format_orbit_number)
    orbit_number = _list_to_regex(orbit_number, "." * 5)

    frame_id = _format_input(frame_id, format_func=format_frame_id)
    frame_id = _list_to_regex(frame_id, ".")

    oaf_list = []
    oaf = ""
    if orbit_number != "." * 5:
        oaf_list.append(orbit_number)
    if frame_id != ".":
        oaf_list.append(frame_id)
    if orbit_number != "." * 5 or frame_id != ".":
        oaf = f"{orbit_number}{frame_id}"

    if oaf == "":
        oaf = orbit_and_frame
    elif oaf != "" and orbit_and_frame != "." * 6:
        oaf = f"(({oaf})|{orbit_and_frame})"

    pattern = f".*{mission_id}_{agency}{latency}{baseline_and_file_type}_........T......Z_........T......Z_{oaf}.h5"

    files: list[str]
    if pattern == ".*ECA_...._..._..._.._........T......Z_........T......Z_.......h5":
        files = []
    elif mode == "fast" and len(file_type) > 0:
        files = []
        for ft in file_type:
            lvl = FileType.from_input(ft).get_level()
            _lvl_subdir = ""
            if lvl == "1B":
                _lvl_subdir = config.subdir_name_level1b
            elif lvl == "1C":
                _lvl_subdir = config.subdir_name_level1c
            elif lvl == "1D":
                _lvl_subdir = config.subdir_name_auxiliary_files
            elif lvl == "2A":
                _lvl_subdir = config.subdir_name_level2a
            elif lvl == "2B":
                _lvl_subdir = config.subdir_name_level2b
            else:
                raise ValueError(f"file type '{ft}' not supported for search mode '{mode}'")
            _root_dirpath = os.path.join(root_dirpath, _lvl_subdir, ft)

            if start_time is not None:
                _date_subdir = _get_date_subdir(start_time, end_time)
                if isinstance(_date_subdir, str):
                    _root_dirpath = os.path.join(root_dirpath, _lvl_subdir, ft, _date_subdir)

            if os.path.exists(_root_dirpath):
                print(f"Searching data at <{_root_dirpath}>")
                _files = search_files_by_regex(_root_dirpath, pattern)
            else:
                _files = []

            files.extend(_files)
    else:
        files = search_files_by_regex(root_dirpath, pattern)

    if isinstance(filename, str) or isinstance(filename, Sequence):
        if isinstance(filename, str):
            filename = [filename]

        def _get_pattern(fn):
            return f".*{os.path.basename(fn).replace('.h5', '')}.*.h5"

        filename = [_get_pattern(fn) for fn in filename]
    elif filename is None:
        filename = []
    else:
        raise TypeError(f"Given filename has invalid type ({type(filename)}: {filename})")

    for fn in filename:
        new_files = search_files_by_regex(root_dirpath, fn)
        files.extend(new_files)

    # Remove duplicates
    files = list(set(files))

    old_files = files.copy()
    if len(timestamp) > 0:
        files = []
        for t in timestamp:
            new_files = [f for f in old_files if _check_product_contains_timestamp(f, t)]
            if len(new_files) > 0:
                files.extend(new_files)

    pdf = get_product_infos(files, read_geo_from_hdr=read_geo_from_hdr)

    if start_time is not None or end_time is not None:
        _pdf = get_product_infos(old_files, read_geo_from_hdr=read_geo_from_hdr)
        _pdf = _filter_time_range(_pdf, start_time=start_time, end_time=end_time)

        if not pdf.empty and not _pdf.empty:
            pdf = ProductDataFrame(pd.concat([pdf, _pdf], ignore_index=True))
        elif not _pdf.empty:
            pdf = _pdf

    pdf = pdf.sort_values(by=["orbit_and_frame", "file_type", "start_processing_time"])
    pdf = pdf.drop_duplicates()
    pdf = pdf.reset_index(drop=True)

    pdf.validate_columns()
    return pdf