Source code for kerchunk.hdf

import io
import logging
import pathlib
from typing import Union, Any, Dict, List, Tuple

import fsspec.core
from fsspec.implementations.reference import LazyReferenceMapper
import numpy as np
import zarr
import numcodecs
from numcodecs.abc import Codec
from .codecs import FillStringsCodec
from .utils import (
    _encode_for_JSON,
    encode_fill_value,
    dict_to_store,
    translate_refs_serializable,
)

try:
    import h5py
except ModuleNotFoundError:  # pragma: no cover
    raise ImportError(
        "h5py is required for kerchunking HDF5/NetCDF4 files. Please install with "
        "`pip/conda install h5py`. See https://docs.h5py.org/en/latest/build.html "
        "for more details."
    )

try:
    import hdf5plugin  # noqa
except ModuleNotFoundError:
    hdf5plugin = None

lggr = logging.getLogger("h5-to-zarr")
_HIDDEN_ATTRS = {  # from h5netcdf.attrs
    "REFERENCE_LIST",
    "CLASS",
    "DIMENSION_LIST",
    "NAME",
    "_Netcdf4Dimid",
    "_Netcdf4Coordinates",
    "_nc3_strict",
    "_NCProperties",
}


class Hdf5FeatureNotSupported(RuntimeError):
    pass


[docs] class SingleHdf5ToZarr: """Translate the content of one HDF5 file into Zarr metadata. HDF5 groups become Zarr groups. HDF5 datasets become Zarr arrays. Zarr array chunks remain in the HDF5 file. Parameters ---------- h5f : file-like or str Input HDF5 file. Can be a binary Python file-like object (duck-typed, adhering to BinaryIO is optional), in which case must also provide url. If a str, file will be opened using fsspec and storage_options. url : string URI of the HDF5 file, if passing a file-like object or h5py File/Group spec : int The version of output to produce (see README of this repo) inline_threshold : int Include chunks smaller than this value directly in the output. Zero or negative to disable storage_options: dict passed to fsspec if h5f is a str error: "warn" (default) | "pdb" | "ignore" | "raise" vlen_encode: ["embed", "null", "leave", "encode"] What to do with VLEN string variables or columns of tabular variables leave: pass through the 16byte garbage IDs unaffected, but requires no codec null: set all the strings to None or empty; required that this library is available at read time embed: include all the values in the output JSON (should not be used for large tables) encode: save the ID-to-value mapping in a codec, to produce the real values at read time; requires this library to be available. Can be efficient storage where there are few unique values. out: dict-like or None This allows you to supply an fsspec.implementations.reference.LazyReferenceMapper to write out parquet as the references get filled, or some other dictionary-like class to customise how references get stored unsupported_inline_threshold: int or None Include chunks involving unsupported h5 features and smaller than this value directly in the output. None will use "inline_threshold". """ def __init__( self, h5f: "io.BinaryIO | str | h5py.File | h5py.Group", url: str = None, spec=1, inline_threshold=500, storage_options=None, error="warn", vlen_encode="embed", out=None, unsupported_inline_threshold=None, ): # Open HDF5 file in read mode... lggr.debug(f"HDF5 file: {h5f}") if isinstance(h5f, (pathlib.Path, str)): fs, path = fsspec.core.url_to_fs(h5f, **(storage_options or {})) self.input_file = fs.open(path, "rb") url = h5f self._h5f = h5py.File(self.input_file, mode="r") elif isinstance(h5f, io.IOBase): self.input_file = h5f self._h5f = h5py.File(self.input_file, mode="r") elif isinstance(h5f, (h5py.File, h5py.Group)): # assume h5py object (File or group/dataset) self._h5f = h5f fs, path = fsspec.core.url_to_fs(url, **(storage_options or {})) self.input_file = fs.open(path, "rb") else: raise ValueError("type of input `h5f` not recognised") self.spec = spec self.inline = inline_threshold if vlen_encode not in ["embed", "null", "leave", "encode"]: raise NotImplementedError self.vlen = vlen_encode self.store_dict = out or {} self.store = dict_to_store(self.store_dict) self._zroot = zarr.group(store=self.store, zarr_format=2) self._uri = url self.error = error if unsupported_inline_threshold is None: unsupported_inline_threshold = inline_threshold or 100 self.unsupported_inline_threshold = unsupported_inline_threshold lggr.debug(f"HDF5 file URI: {self._uri}")
[docs] def translate(self, preserve_linked_dsets=False): """Translate content of one HDF5 file into Zarr storage format. This method is the main entry point to execute the workflow, and returns a "reference" structure to be used with zarr/kerchunk No data is copied out of the HDF5 file. Parameters ---------- preserve_linked_dsets : bool (optional, default False) If True, translate HDF5 soft and hard links for each `h5py.Dataset` into the reference structure. Requires h5py version 3.11.0 or later. Will not translate external links or links to `h5py.Group` objects. Returns ------- dict Dictionary containing reference structure. """ lggr.debug("Translation begins") self._transfer_attrs(self._h5f, self._zroot) self._h5f.visititems(self._translator) if preserve_linked_dsets: if not has_visititems_links(): raise Hdf5FeatureNotSupported( "'preserve_linked_dsets' kwarg requires h5py 3.11.0 or later " f"is installed, found {h5py.__version__}" ) self._h5f.visititems_links(self._translator) if self.spec < 1: return self.store elif isinstance(self.store, LazyReferenceMapper): self.store.flush() return self.store else: translate_refs_serializable(self.store_dict) store = _encode_for_JSON(self.store_dict) return {"version": 1, "refs": store}
def _unref(self, ref): name = h5py.h5r.get_name(ref, self._h5f.id) return self._h5f[name] def _transfer_attrs( self, h5obj: Union[h5py.Dataset, h5py.Group], zobj: Union[zarr.Array, zarr.Group], ): """Transfer attributes from an HDF5 object to its equivalent Zarr object. Parameters ---------- h5obj : h5py.Group or h5py.Dataset An HDF5 group or dataset. zobj : zarr.hierarchy.Group or zarr.core.Array An equivalent Zarr group or array to the HDF5 group or dataset with attributes. """ upd = {} for n, v in h5obj.attrs.items(): if n in _HIDDEN_ATTRS: continue # Fix some attribute values to avoid JSON encoding exceptions... if isinstance(v, bytes): v = v.decode("utf-8", errors="replace") or " " elif isinstance(v, (np.ndarray, np.number, np.bool_)): if v.dtype.kind == "S": v = v.astype(str) if n == "_FillValue": continue # strip it out! elif v.size == 1: v = v.flatten()[0] if isinstance(v, (np.ndarray, np.number, np.bool_)): v = v.tolist() else: v = v.tolist() elif isinstance(v, h5py._hl.base.Empty): v = "" if v == "DIMENSION_SCALE": continue try: if isinstance(v, (str, int, float)): upd[n] = v elif isinstance(v, (tuple, set, list)) and all( isinstance(_, (str, int, float)) for _ in v ): upd[n] = list(v) else: upd[n] = str(v) except TypeError: lggr.debug( f"TypeError transferring attr, skipping:\n {n}@{h5obj.name} = {v} ({type(v)})" ) zobj.attrs.update(upd) def _decode_filters(self, h5obj: Union[h5py.Dataset, h5py.Group]): if h5obj.scaleoffset: raise Hdf5FeatureNotSupported( f"{h5obj.name} uses HDF5 scaleoffset filter - not supported by kerchunk" ) if h5obj.compression in ("szip", "lzf"): raise Hdf5FeatureNotSupported( f"{h5obj.name} uses szip or lzf compression - not supported by kerchunk" ) filters = [] if h5obj.shuffle and h5obj.dtype.kind != "O": # cannot use shuffle if we materialised objects filters.append(numcodecs.Shuffle(elementsize=h5obj.dtype.itemsize)) for filter_id, properties in h5obj._filters.items(): if str(filter_id) == "32001": blosc_compressors = ( "blosclz", "lz4", "lz4hc", "snappy", "zlib", "zstd", ) ( _1, _2, bytes_per_num, total_bytes, clevel, shuffle, compressor, ) = properties pars = dict( blocksize=total_bytes, clevel=clevel, shuffle=shuffle, cname=blosc_compressors[compressor], ) filters.append(numcodecs.Blosc(**pars)) elif str(filter_id) == "32015": filters.append(numcodecs.Zstd(level=properties[0])) elif str(filter_id) == "gzip": filters.append(numcodecs.Zlib(level=properties)) elif str(filter_id) == "32004": raise Hdf5FeatureNotSupported( f"{h5obj.name} uses lz4 compression - not supported by kerchunk" ) elif str(filter_id) == "32008": raise Hdf5FeatureNotSupported( f"{h5obj.name} uses bitshuffle compression - not supported by kerchunk" ) elif str(filter_id) == "shuffle": # already handled before this loop pass else: raise Hdf5FeatureNotSupported( f"{h5obj.name} uses filter id {filter_id} with properties {properties}," f" not supported by kerchunk." ) return filters def _translator( self, name: str, h5obj: Union[ h5py.Dataset, h5py.Group, h5py.SoftLink, h5py.HardLink, h5py.ExternalLink ], ): """Produce Zarr metadata for all groups and datasets in the HDF5 file.""" try: # method must not raise exception kwargs: Dict[str, Any] = {"compressor": None} if isinstance(h5obj, (h5py.SoftLink, h5py.HardLink)): h5obj = self._h5f[name] if isinstance(h5obj, h5py.Group): # continues iteration of visititems_links lggr.debug( f"Skipping translation of HDF5 linked group: '{h5obj.name}'" ) return None if isinstance(h5obj, h5py.Dataset): lggr.debug(f"HDF5 dataset: {h5obj.name}") lggr.debug(f"HDF5 compression: {h5obj.compression}") if h5obj.id.get_create_plist().get_layout() == h5py.h5d.COMPACT: # Only do if h5obj.nbytes < self.inline?? # kwargs["data"] = h5obj[:] if h5obj.nbytes < self.inline: kwargs["data"] = h5obj[:] kwargs["filters"] = [] else: try: kwargs["filters"] = self._decode_filters(h5obj) except Hdf5FeatureNotSupported: if h5obj.nbytes < self.unsupported_inline_threshold: kwargs["data"] = _read_unsupported_direct(h5obj) kwargs["filters"] = [] else: raise dt = None # Get storage info of this HDF5 dataset... if "data" in kwargs: cinfo = _NULL_CHUNK_INFO else: try: cinfo = self._storage_info_and_adj_filters( h5obj, kwargs["filters"] ) except Hdf5FeatureNotSupported: if h5obj.nbytes < self.unsupported_inline_threshold: kwargs["data"] = _read_unsupported_direct(h5obj) kwargs["filters"] = [] cinfo = _NULL_CHUNK_INFO else: raise if "data" in kwargs: fill = None else: # encodings if h5obj.dtype.kind in "US": fill = h5obj.fillvalue or " " # cannot be None elif h5obj.dtype.kind == "O": if self.vlen == "embed": if np.isscalar(h5obj): out = str(h5obj) elif h5obj.ndim == 0: out = np.array(h5obj).tolist().decode() else: out = h5obj[:] out2 = out.ravel() for i, val in enumerate(out2): if isinstance(val, bytes): out2[i] = val.decode() elif isinstance(val, str): out2[i] = val elif isinstance(val, h5py.h5r.Reference): # TODO: recursively recreate references out2[i] = None else: out2[i] = [ v.decode() if isinstance(v, bytes) else v for v in val ] kwargs["data"] = out kwargs["filters"] = [numcodecs.VLenUTF8()] fill = None elif self.vlen == "null": dt = "O" kwargs["filters"] = [FillStringsCodec(dtype="S16")] fill = " " elif self.vlen == "leave": dt = "S16" fill = " " elif self.vlen == "encode": assert len(cinfo) == 1 v = list(cinfo.values())[0] data = _read_block(self.input_file, v["offset"], v["size"]) indexes = np.frombuffer(data, dtype="S16") labels = h5obj[:] mapping = { index.decode(): label.decode() for index, label in zip(indexes, labels) } kwargs["filters"] = [ FillStringsCodec(dtype="S16", id_map=mapping) ] fill = " " else: raise NotImplementedError elif _is_netcdf_datetime(h5obj) or _is_netcdf_variable(h5obj): fill = None else: fill = h5obj.fillvalue if h5obj.dtype.kind == "V": fill = None if self.vlen == "encode": assert len(cinfo) == 1 v = list(cinfo.values())[0] dt = [ ( v, ( "S16" if h5obj.dtype[v].kind == "O" else str(h5obj.dtype[v]) ), ) for v in h5obj.dtype.names ] data = _read_block(self.input_file, v["offset"], v["size"]) labels = h5obj[:] arr = np.frombuffer(data, dtype=dt) mapping = {} for field in labels.dtype.names: if labels[field].dtype == "O": mapping.update( { index.decode(): label.decode() for index, label in zip( arr[field], labels[field] ) } ) kwargs["filters"] = [ FillStringsCodec(dtype=str(dt), id_map=mapping) ] dt = [ ( v, ( "O" if h5obj.dtype[v].kind == "O" else str(h5obj.dtype[v]) ), ) for v in h5obj.dtype.names ] elif self.vlen == "null": dt = [ ( v, ( "S16" if h5obj.dtype[v].kind == "O" else str(h5obj.dtype[v]) ), ) for v in h5obj.dtype.names ] kwargs["filters"] = [FillStringsCodec(dtype=str(dt))] dt = [ ( v, ( "O" if h5obj.dtype[v].kind == "O" else str(h5obj.dtype[v]) ), ) for v in h5obj.dtype.names ] elif self.vlen == "leave": dt = [ ( v, ( "S16" if h5obj.dtype[v].kind == "O" else h5obj.dtype[v] ), ) for v in h5obj.dtype.names ] elif self.vlen == "embed": # embed fails due to https://github.com/zarr-developers/numcodecs/issues/333 data = h5obj[:].tolist() data2 = [] for d in data: data2.append( [ ( _.decode(errors="ignore") if isinstance(_, bytes) else _ ) for _ in d ] ) dt = "O" kwargs["data"] = data2 kwargs["filters"] = [numcodecs.VLenUTF8()] fill = None else: raise NotImplementedError if h5py.h5ds.is_scale(h5obj.id) and not cinfo: return if h5obj.attrs.get("_FillValue") is not None: fill = h5obj.attrs.get("_FillValue") fill = encode_fill_value(fill, dt or h5obj.dtype) adims = self._get_array_dims(h5obj) # Create a Zarr array equivalent to this HDF5 dataset. data = kwargs.pop("data", None) if (dt or h5obj.dtype) == object: dt = "string" za = self._zroot.require_array( name=h5obj.name.lstrip("/"), shape=h5obj.shape, dtype=dt or h5obj.dtype, chunks=h5obj.chunks or h5obj.shape, fill_value=fill, attributes={ "_ARRAY_DIMENSIONS": adims, }, overwrite=True, **kwargs, ) lggr.debug(f"Created Zarr array: {za}") self._transfer_attrs(h5obj, za) lggr.debug(f"_ARRAY_DIMENSIONS = {adims}") if data is not None: try: za[:] = data except (ValueError, TypeError): store_key = f"{za.path}/{'.'.join('0' * h5obj.ndim)}" self.store_dict[store_key] = _filters_encode_data( data, kwargs["filters"] ) return # Store chunk location metadata... if cinfo: for k, v in cinfo.items(): key = ( str.removeprefix(h5obj.name, "/") + "/" + ".".join(map(str, k)) ) if "data" in v: self.store_dict[key] = v["data"] continue if h5obj.fletcher32: logging.info("Discarding fletcher32 checksum") v["size"] -= 4 key = ( str.removeprefix(h5obj.name, "/") + "/" + ".".join(map(str, k)) ) if ( self.inline and isinstance(v, dict) and v["size"] < self.inline ): self.input_file.seek(v["offset"]) data = self.input_file.read(v["size"]) # Removed the encoding, as will finally be encoded_for_JSON # try: # # easiest way to test if data is ascii # data.decode("ascii") # except UnicodeDecodeError: # data = b"base64:" + base64.b64encode(data) self.store_dict[key] = data else: self.store_dict[key] = [ self._uri, v["offset"], v["size"], ] elif isinstance(h5obj, h5py.Group): lggr.debug(f"HDF5 group: {h5obj.name}") zgrp = self._zroot.require_group(h5obj.name.lstrip("/")) self._transfer_attrs(h5obj, zgrp) except Exception as e: import traceback msg = "\n".join( [ "The following exception was caught and quashed while traversing HDF5", str(e), traceback.format_exc(limit=5), ] ) if self.error == "ignore": return elif self.error == "pdb": print(msg) import pdb pdb.post_mortem() elif self.error == "raise": raise else: # "warn" or anything else, the default import warnings warnings.warn(msg) del e # garbage collect def _get_array_dims(self, dset): """Get a list of dimension scale names attached to input HDF5 dataset. This is required by the xarray package to work with Zarr arrays. Only one dimension scale per dataset dimension is allowed. If dataset is dimension scale, it will be considered as the dimension to itself. Parameters ---------- dset : h5py.Dataset HDF5 dataset. Returns ------- list List with HDF5 path names of dimension scales attached to input dataset. """ dims = list() rank = len(dset.shape) if rank: for n in range(rank): num_scales = len(dset.dims[n]) if num_scales == 1: dims.append(dset.dims[n][0].name[1:]) elif h5py.h5ds.is_scale(dset.id): dims.append(dset.name[1:]) elif num_scales > 1: raise Hdf5FeatureNotSupported( f"{dset.name}: {len(dset.dims[n])} " f"dimension scales attached to dimension #{n}" ) elif num_scales == 0: # Some HDF5 files do not have dimension scales. # If this is the case, `num_scales` will be 0. # In this case, we mimic netCDF4 and assign phony dimension names. # See https://github.com/fsspec/kerchunk/issues/41 dims.append(f"phony_dim_{n}") return dims def _storage_info_and_adj_filters(self, dset: h5py.Dataset, filters: list) -> dict: """Get storage information of an HDF5 dataset in the HDF5 file. Storage information consists of file offset and size (length) for every chunk of the HDF5 dataset. HDF5 dataset also configs for each chunk which filters are skipped by `filter_mask` (mostly in the case where a chunk is small or when write directly with low-level api like `H5Dwrite_chunk`/`DatasetID.write_direct_chunk`), hence a filter will be cleared if the first chunk does not apply it. `Hdf5FeatherNotSupported` will be raised if chucks have heterogeneous `filter_mask` and alien chunks cannot be inlined. Parameters ---------- dset : h5py.Dataset HDF5 dataset for which to collect storage information. filters: list List of filters to apply to the HDF5 dataset. Will be modified in place if some filters not applied. Returns ------- dict HDF5 dataset storage information. Dict keys are chunk array offsets as tuples. Dict values are pairs with chunk file offset and size integers, or data content if in need of inline for some reason. """ # Empty (null) dataset... if dset.shape is None: return _NULL_CHUNK_INFO dsid = dset.id if dset.chunks is None: # Contiguous dataset... if dsid.get_offset() is None: # No data ever written... return _NULL_CHUNK_INFO else: key = (0,) * (len(dset.shape) or 1) return { key: {"offset": dsid.get_offset(), "size": dsid.get_storage_size()} } else: # Chunked dataset... num_chunks = dsid.get_num_chunks() if num_chunks == 0: # No data ever written... return _NULL_CHUNK_INFO # Go over all the dataset chunks... stinfo = dict() chunk_size = dset.chunks filter_mask = None # type: None | int def get_key(blob): return tuple([a // b for a, b in zip(blob.chunk_offset, chunk_size)]) def filter_filters_by_mask( filter_list: List[Codec], filter_mask_ ) -> List[Codec]: # use [2:] to remove the heading `0b` and [::-1] to reverse the order bin_mask = bin(filter_mask_)[2:][::-1] filters_rest = [ ifilter for ifilter, imask in zip(filter_list, bin_mask) if imask == "0" ] return filters_rest def store_chunk_info(blob): nonlocal filter_mask if filter_mask is None: filter_mask = blob.filter_mask elif filter_mask != blob.filter_mask: if blob.size < self.unsupported_inline_threshold: data_slc = tuple( slice(dim_offset, min(dim_offset + dim_chunk, dim_bound)) for dim_offset, dim_chunk, dim_bound in zip( blob.chunk_offset, chunk_size, dset.shape ) ) data = _read_unsupported_direct(dset, data_slc) if data.shape != chunk_size: bg = np.full( chunk_size, dset.fillvalue, dset.dtype, order="C" ) bg[tuple(slice(0, d) for d in data.shape)] = data data_flatten = bg.reshape(-1) else: data_flatten = data.reshape(-1) encoded = _filters_encode_data( data_flatten, filter_filters_by_mask(filters, filter_mask) ) stinfo[get_key(blob)] = {"data": encoded} return else: raise Hdf5FeatureNotSupported( f"Dataset {dset.name} has heterogeneous `filter_mask` - " f"not supported by kerchunk." ) stinfo[get_key(blob)] = {"offset": blob.byte_offset, "size": blob.size} has_chunk_iter = callable(getattr(dsid, "chunk_iter", None)) if has_chunk_iter: dsid.chunk_iter(store_chunk_info) else: for index in range(num_chunks): store_chunk_info(dsid.get_chunk_info(index)) # In most cases, the `filter_mask` should be zero, which means that all filters are applied. # If a filter is skipped, the corresponding bit in the mask fill be set 1. if filter_mask is not None and filter_mask != 0: filters[:] = filter_filters_by_mask(filters, filter_mask) return stinfo
def _read_unsupported_direct(dset, slc: Union[slice, Tuple[slice, ...]] = slice(None)): try: return dset[slc] # As what I googled, you might get OSError/ValueError/RuntimeError # when a filter in need was not registered. I met an `OSError` with # bizarre error message in my test without `hdf5plugin` imported. # Just simply catch all exceptions, as we will rethrow it anyway. except Exception: if hdf5plugin is None: import warnings warnings.warn( "Attempt to directly read h5-dataset via `h5py` failed. It is recommended to " "install `hdf5plugin` so that we can register extra filters, and then try again." ) raise def _simple_type(x): if isinstance(x, bytes): return x.decode() if isinstance(x, np.number): if x.dtype.kind == "i": return int(x) return float(x) return x def _read_block(open_file, offset, size): place = open_file.tell() open_file.seek(offset) data = open_file.read(size) open_file.seek(place) return data def _is_netcdf_datetime(dataset: h5py.Dataset): units = dataset.attrs.get("units") if isinstance(units, bytes): units = units.decode("utf-8") # This is the same heuristic used by xarray # https://github.com/pydata/xarray/blob/f8bae5974ee2c3f67346298da12621af4cae8cf8/xarray/coding/times.py#L670 return units and "since" in units def _is_netcdf_variable(dataset: h5py.Dataset): return any("_Netcdf4" in _ for _ in dataset.attrs) def has_visititems_links(): return hasattr(h5py.Group, "visititems_links") def _filters_encode_data(data, filters: List[Codec]): for ifilter in filters: data = ifilter.encode(data) return bytes(data) _NULL_CHUNK_INFO = dict()