Source code for xbout.xarraybackend

"""
xarray backend for reading ADIOS2 ``.bp`` files.

This backend provides an xarray ``BackendEntrypoint`` that can open ADIOS2
datasets via the ``adios2`` Python package. Variables are represented as
``LazilyIndexedArray`` objects backed by ADIOS2 selections.

On-disk conventions used by this backend:
- Per-variable dimension names are stored as attributes
  ``{varname}/__xarray_dimensions__``.
- Dataset-level attributes are stored under ``__xarray_dataset_attrs__/{key}``.
"""

from __future__ import annotations

import os

from collections.abc import Iterable
from typing import TYPE_CHECKING, Any, ItemsView

import numpy as np

from xarray import Dataset, Variable
from xarray.backends.common import (
    BackendArray,
    BackendEntrypoint,
    _normalize_path,
)

from xarray.core import indexing

if TYPE_CHECKING:  # pragma: no cover
    from adios2 import FileReader
    from io import BufferedIOBase
    from xarray.backends.common import AbstractDataStore


DATASET_ATTR_PREFIX = "__xarray_dataset_attrs__/"
XARRAY_DIMS_ATTR = "__xarray_dimensions__"
XARRAY_ORIGINAL_DTYPE_ATTR = "__xarray_original_dtype__"

adios_to_numpy_type = {
    "char": np.char,
    "int8_t": np.int8,
    "int16_t": np.int16,
    "int32_t": np.int32,
    "int64_t": np.int64,
    "uint8_t": np.uint8,
    "uint16_t": np.uint16,
    "uint32_t": np.uint32,
    "uint64_t": np.uint64,
    "float": float,
    "double": np.double,
    "long double": np.longdouble,
    "float complex": np.complex64,
    "double complex": np.complex128,
    "string": np.char,
}


[docs] class BoutADIOSBackendArray(BackendArray): """ Lazily indexed array backed by an ADIOS2 Variable. xarray calls ``__getitem__`` with an ``ExplicitIndexer``; this class maps the indexer into ADIOS2 ``set_step_selection`` (for time/steps) and ``set_selection`` (for spatial dimensions), then reads the selection. """
[docs] def __init__( self, shape: list, dtype: np.dtype, lock, adiosfile: FileReader, varname: str, *, cast_dtype: np.dtype | None = None, ): """ Parameters ---------- shape Full xarray-visible shape. If the ADIOS2 variable has steps, the first dimension is the synthetic xarray time dimension. dtype Numpy dtype used by ADIOS2 for the stored variable. lock Optional lock for thread-safety. ADIOS2 reads are not thread-safe. adiosfile Open ADIOS2 ``FileReader`` handle. varname Name of the ADIOS2 variable to read. cast_dtype Optional dtype to cast to after reading (used to round-trip types that are stored differently on disk, e.g. ``bool`` stored as ``uint8``). """ self.shape = shape self.dtype = dtype self.lock = lock self.fh = adiosfile self.varname = varname self.cast_dtype = cast_dtype self.adiosvar = self.fh.inquire_variable(varname) self.steps = self.adiosvar.steps()
def __getitem__(self, key: indexing.ExplicitIndexer) -> np.typing.ArrayLike: """Read a selection defined by an xarray ``ExplicitIndexer``.""" return indexing.explicit_indexing_adapter( key, self.shape, indexing.IndexingSupport.BASIC, self._raw_indexing_method, ) def _raw_indexing_method(self, key: tuple) -> np.typing.ArrayLike: """ Convert xarray basic indexing into ADIOS2 selection calls and read. Notes ----- - ADIOS2 does not support stepped slicing (``slice.step != 1``). - ADIOS2 time is represented as "steps". If an ADIOS2 variable has steps, xarray's first dimension is treated as time and mapped via ``set_step_selection``. """ start = [] count = [] dimid = 0 first_sl = True for sl in key: if isinstance(sl, slice): if sl.start is None: st = 0 else: st = sl.start if sl.stop is None: ct = self.shape[dimid] - st else: ct = sl.stop - st if sl.step != 1 and sl.step is not None: msg = ( "The indexing operation with step != 1 you are attempting to perform " "is not valid on ADIOS2.Variable object. " ) raise IndexError(msg) else: st = sl - 1 ct = 1 if self.steps > 1 and first_sl: # key[0] is the step selection # print(f" data step selection start = {st} count = {ct}") self.adiosvar.set_step_selection([st, ct]) # Advance past the implicit steps dimension in self.shape dimid += 1 else: start.append(st) count.append(ct) dimid += 1 first_sl = False self.adiosvar.set_selection([start, count]) data = self.fh.read(self.adiosvar) if self.steps > 1: # ADIOS does not have time dimension. Read returns n-dim array # with the steps included in the first dimension dim0 = int(data.shape[0] / self.steps) if data.shape[0] % self.steps != 0: print( f"ERROR in BoutADIOSBackendArray: first dimension problem " f"with handling steps. Variable name={self.varname} " f"shape={data.shape}, steps={self.steps}" ) data = data.reshape((self.steps, dim0) + data.shape[1:]) if self.cast_dtype is not None: data = np.asarray(data).astype(self.cast_dtype, copy=False) return data
[docs] def attrs_of_var(varname: str, items: ItemsView, separator: str = "/"): """Return (name, info) pairs for attributes scoped to a variable.""" return [(key, value) for key, value in items if key.startswith(varname + separator)]
# pylint: disable=R0902 # Too many instance attributes # pylint: disable=R0912 # Too many branches # pylint: disable=E1121 # too-many-function-args
[docs] class BoutAdiosBackendEntrypoint(BackendEntrypoint): """ xarray backend entrypoint for ADIOS2 ``.bp`` datasets. For more information about the underlying library, visit: https://adios2.readthedocs.io/en/stable See Also -------- backends.AdiosStore """ description = "Open ADIOS2 files/folders (.bp) using adios2 in Xarray" url = "https://docs.xarray.dev/en/stable/generated/xarray.backends.ZarrBackendEntrypoint.html"
[docs] def __init__(self): self._fh = None
[docs] def close(self) -> None: """Close the underlying ADIOS2 file handle, if one is open.""" if self._fh is not None: self._fh.close() self._fh = None
[docs] def guess_can_open( self, filename_or_obj: str | os.PathLike[Any] | BufferedIOBase | AbstractDataStore, ) -> bool: """Return True if this backend can open the provided filename.""" if isinstance(filename_or_obj, (str, os.PathLike)): _, ext = os.path.splitext(filename_or_obj) return ext in {".bp"} return False
[docs] def open_dataset( # type: ignore[override] # allow LSP violation, not supporting **kwargs self, filename_or_obj: str | os.PathLike[Any] | BufferedIOBase | AbstractDataStore, *, drop_variables: str | Iterable[str] | None = None, ) -> Dataset: """ Open an ADIOS2 ``.bp`` file/folder as an xarray Dataset. Parameters ---------- filename_or_obj Path to a ``.bp`` dataset (directory or file, depending on ADIOS2 engine). drop_variables Optional variable name or iterable of variable names to exclude. """ from adios2 import FileReader filename_or_obj = _normalize_path(filename_or_obj) self._fh = FileReader(filename_or_obj) vars = self._fh.available_variables() attrs = self._fh.available_attributes() attr_items = attrs.items() xvars = {} for varname, varinfo in vars.items(): if drop_variables is not None and varname in drop_variables: continue shape_str = varinfo["Shape"].split(", ") if shape_str[0]: shape_list = list(map(int, shape_str)) else: shape_list = [] shape_str = [] steps = int(varinfo["AvailableStepsCount"]) varattrs = attrs_of_var(varname, attr_items, "/") dims = None vlen = len(varname) + 1 # include / xattrs = {} original_dtype: np.dtype | None = None for aname, ainfo in varattrs: attr_value = self._fh.read_attribute(aname) if aname == varname + "/" + XARRAY_DIMS_ATTR: dims = attr_value elif aname == varname + "/" + XARRAY_ORIGINAL_DTYPE_ATTR: try: original_dtype = np.dtype(str(attr_value)) except TypeError: original_dtype = None else: xattrs[aname[vlen:]] = attr_value attrs.pop(aname) # Create the xarray variable if dims is None: dims = shape_str if shape_list != []: if steps > 1: shape_list.insert(0, steps) dims.insert(0, "t") nptype = np.dtype(adios_to_numpy_type[varinfo["Type"]]) cast_dtype = ( original_dtype if original_dtype is not None and original_dtype != nptype else None ) xdata = indexing.LazilyIndexedArray( BoutADIOSBackendArray( shape_list, nptype, None, self._fh, varname, cast_dtype=cast_dtype, ) ) # print(f"\tDefine VARIABLE {varname} with dims {dims}") xvar = Variable( dims, xdata, attrs=xattrs, encoding={"dtype": (original_dtype or nptype)}, ) else: if steps > 1: avar = self._fh.inquire_variable(varname) avar.set_step_selection([0, avar.steps()]) data = self._fh.read(avar) if original_dtype is not None and data.dtype != original_dtype: data = np.asarray(data).astype(original_dtype, copy=False) xvar = Variable( "t", data, attrs=xattrs, encoding={"dtype": data.dtype} ) else: data = self._fh.read(varname) if ( original_dtype is not None and np.asarray(data).dtype != original_dtype ): data = np.asarray(data).astype(original_dtype, copy=False) if varinfo["Type"] == "string": xvar = Variable([], data, attrs=xattrs, encoding=None) else: xvar = Variable([], data, attrs=xattrs, encoding=None) xvars[varname] = xvar ds_attrs = {} for attname in list(attrs.keys()): attr_value = self._fh.read_attribute(attname) if isinstance(attname, str) and attname.startswith(DATASET_ATTR_PREFIX): ds_attrs[attname[len(DATASET_ATTR_PREFIX) :]] = attr_value else: ds_attrs[attname] = attr_value ds = Dataset(xvars, None, ds_attrs) ds.set_close(self.close) return ds