Source code for kerchunk.hdf

import base64
import io
import logging
from typing import Union, BinaryIO

import fsspec.core
from fsspec.implementations.reference import LazyReferenceMapper
import numpy as np
import zarr
from zarr.meta import encode_fill_value
import numcodecs

from .codecs import FillStringsCodec
from .utils import _encode_for_JSON

try:
    import h5py
except ModuleNotFoundError:  # pragma: no cover
    raise ImportError(
        "h5py is required for kerchunking HDF5/NetCDF4 files. Please install with "
        "`pip/conda install h5py`. See https://docs.h5py.org/en/latest/build.html "
        "for more details."
    )

lggr = logging.getLogger("h5-to-zarr")
_HIDDEN_ATTRS = {  # from h5netcdf.attrs
    "REFERENCE_LIST",
    "CLASS",
    "DIMENSION_LIST",
    "NAME",
    "_Netcdf4Dimid",
    "_Netcdf4Coordinates",
    "_nc3_strict",
    "_NCProperties",
}


[docs] class SingleHdf5ToZarr: """Translate the content of one HDF5 file into Zarr metadata. HDF5 groups become Zarr groups. HDF5 datasets become Zarr arrays. Zarr array chunks remain in the HDF5 file. Parameters ---------- h5f : file-like or str Input HDF5 file. Can be a binary Python file-like object (duck-typed, adhering to BinaryIO is optional), in which case must also provide url. If a str, file will be opened using fsspec and storage_options. url : string URI of the HDF5 file, if passing a file-like object or h5py File/Group spec : int The version of output to produce (see README of this repo) inline_threshold : int Include chunks smaller than this value directly in the output. Zero or negative to disable storage_options: dict passed to fsspec if h5f is a str error: "warn" (default) | "pdb" | "ignore" | "raise" vlen_encode: ["embed", "null", "leave", "encode"] What to do with VLEN string variables or columns of tabular variables leave: pass through the 16byte garbage IDs unaffected, but requires no codec null: set all the strings to None or empty; required that this library is available at read time embed: include all the values in the output JSON (should not be used for large tables) encode: save the ID-to-value mapping in a codec, to produce the real values at read time; requires this library to be available. Can be efficient storage where there are few unique values. out: dict-like or None This allows you to supply an fsspec.implementations.reference.LazyReferenceMapper to write out parquet as the references get filled, or some other dictionary-like class to customise how references get stored """ def __init__( self, h5f: "BinaryIO | str | h5py.File | h5py.Group", url: str = None, spec=1, inline_threshold=500, storage_options=None, error="warn", vlen_encode="embed", out=None, ): # Open HDF5 file in read mode... lggr.debug(f"HDF5 file: {h5f}") if isinstance(h5f, str): fs, path = fsspec.core.url_to_fs(h5f, **(storage_options or {})) self.input_file = fs.open(path, "rb") url = h5f self._h5f = h5py.File(self.input_file, mode="r") elif isinstance(h5f, io.IOBase): self.input_file = h5f self._h5f = h5py.File(self.input_file, mode="r") elif isinstance(h5f, (h5py.File, h5py.Group)): # assume h5py object (File or group/dataset) self._h5f = h5f fs, path = fsspec.core.url_to_fs(url, **(storage_options or {})) self.input_file = fs.open(path, "rb") else: raise ValueError("type of input `h5f` not recognised") self.spec = spec self.inline = inline_threshold if vlen_encode not in ["embed", "null", "leave", "encode"]: raise NotImplementedError self.vlen = vlen_encode self.store = out or {} self._zroot = zarr.group(store=self.store, overwrite=True) self._uri = url self.error = error lggr.debug(f"HDF5 file URI: {self._uri}")
[docs] def translate(self): """Translate content of one HDF5 file into Zarr storage format. This method is the main entry point to execute the workflow, and returns a "reference" structure to be used with zarr/kerchunk No data is copied out of the HDF5 file. Returns ------- dict Dictionary containing reference structure. """ lggr.debug("Translation begins") self._transfer_attrs(self._h5f, self._zroot) self._h5f.visititems(self._translator) if self.spec < 1: return self.store elif isinstance(self.store, LazyReferenceMapper): self.store.flush() return self.store else: store = _encode_for_JSON(self.store) return {"version": 1, "refs": store}
def _unref(self, ref): name = h5py.h5r.get_name(ref, self._h5f.id) return self._h5f[name] def _transfer_attrs( self, h5obj: Union[h5py.Dataset, h5py.Group], zobj: Union[zarr.Array, zarr.Group], ): """Transfer attributes from an HDF5 object to its equivalent Zarr object. Parameters ---------- h5obj : h5py.Group or h5py.Dataset An HDF5 group or dataset. zobj : zarr.hierarchy.Group or zarr.core.Array An equivalent Zarr group or array to the HDF5 group or dataset with attributes. """ for n, v in h5obj.attrs.items(): if n in _HIDDEN_ATTRS: continue # Fix some attribute values to avoid JSON encoding exceptions... if isinstance(v, bytes): v = v.decode("utf-8") or " " elif isinstance(v, (np.ndarray, np.number, np.bool_)): if v.dtype.kind == "S": v = v.astype(str) if n == "_FillValue": continue # strip it out! elif v.size == 1: v = v.flatten()[0] if isinstance(v, (np.ndarray, np.number, np.bool_)): v = v.tolist() else: v = v.tolist() elif isinstance(v, h5py._hl.base.Empty): v = "" if v == "DIMENSION_SCALE": continue try: zobj.attrs[n] = v except TypeError: lggr.debug( f"TypeError transferring attr, skipping:\n {n}@{h5obj.name} = {v} ({type(v)})" ) def _decode_filters(self, h5obj: Union[h5py.Dataset, h5py.Group]): if h5obj.scaleoffset: raise RuntimeError( f"{h5obj.name} uses HDF5 scaleoffset filter - not supported by kerchunk" ) if h5obj.compression in ("szip", "lzf"): raise RuntimeError( f"{h5obj.name} uses szip or lzf compression - not supported by kerchunk" ) filters = [] if h5obj.shuffle and h5obj.dtype.kind != "O": # cannot use shuffle if we materialised objects filters.append(numcodecs.Shuffle(elementsize=h5obj.dtype.itemsize)) for filter_id, properties in h5obj._filters.items(): if str(filter_id) == "32001": blosc_compressors = ( "blosclz", "lz4", "lz4hc", "snappy", "zlib", "zstd", ) ( _1, _2, bytes_per_num, total_bytes, clevel, shuffle, compressor, ) = properties pars = dict( blocksize=total_bytes, clevel=clevel, shuffle=shuffle, cname=blosc_compressors[compressor], ) filters.append(numcodecs.Blosc(**pars)) elif str(filter_id) == "32015": filters.append(numcodecs.Zstd(level=properties[0])) elif str(filter_id) == "gzip": filters.append(numcodecs.Zlib(level=properties)) elif str(filter_id) == "32004": raise RuntimeError( f"{h5obj.name} uses lz4 compression - not supported by kerchunk" ) elif str(filter_id) == "32008": raise RuntimeError( f"{h5obj.name} uses bitshuffle compression - not supported by kerchunk" ) elif str(filter_id) == "shuffle": # already handled before this loop pass else: raise RuntimeError( f"{h5obj.name} uses filter id {filter_id} with properties {properties}," f" not supported by kerchunk." ) return filters def _translator(self, name: str, h5obj: Union[h5py.Dataset, h5py.Group]): """Produce Zarr metadata for all groups and datasets in the HDF5 file.""" try: # method must not raise exception kwargs = {} if isinstance(h5obj, h5py.Dataset): lggr.debug(f"HDF5 dataset: {h5obj.name}") lggr.debug(f"HDF5 compression: {h5obj.compression}") if h5obj.id.get_create_plist().get_layout() == h5py.h5d.COMPACT: # Only do if h5obj.nbytes < self.inline?? kwargs["data"] = h5obj[:] filters = [] else: filters = self._decode_filters(h5obj) dt = None # Get storage info of this HDF5 dataset... cinfo = self._storage_info(h5obj) if "data" in kwargs: fill = None else: # encodings if h5obj.dtype.kind in "US": fill = h5obj.fillvalue or " " # cannot be None elif h5obj.dtype.kind == "O": if self.vlen == "embed": if np.isscalar(h5obj): out = str(h5obj) elif h5obj.ndim == 0: out = np.array(h5obj).tolist().decode() else: out = h5obj[:] out2 = out.ravel() for i, val in enumerate(out2): if isinstance(val, bytes): out2[i] = val.decode() elif isinstance(val, str): out2[i] = val elif isinstance(val, h5py.h5r.Reference): # TODO: recursively recreate references out2[i] = None else: out2[i] = [ v.decode() if isinstance(v, bytes) else v for v in val ] kwargs["data"] = out kwargs["object_codec"] = numcodecs.JSON() fill = None elif self.vlen == "null": dt = "O" kwargs["object_codec"] = FillStringsCodec(dtype="S16") fill = " " elif self.vlen == "leave": dt = "S16" fill = " " elif self.vlen == "encode": assert len(cinfo) == 1 v = list(cinfo.values())[0] data = _read_block(self.input_file, v["offset"], v["size"]) indexes = np.frombuffer(data, dtype="S16") labels = h5obj[:] mapping = { index.decode(): label.decode() for index, label in zip(indexes, labels) } kwargs["object_codec"] = FillStringsCodec( dtype="S16", id_map=mapping ) fill = " " else: raise NotImplementedError elif _is_netcdf_datetime(h5obj) or _is_netcdf_variable(h5obj): fill = None else: fill = h5obj.fillvalue if h5obj.dtype.kind == "V": fill = None if self.vlen == "encode": assert len(cinfo) == 1 v = list(cinfo.values())[0] dt = [ ( v, ( "S16" if h5obj.dtype[v].kind == "O" else str(h5obj.dtype[v]) ), ) for v in h5obj.dtype.names ] data = _read_block(self.input_file, v["offset"], v["size"]) labels = h5obj[:] arr = np.frombuffer(data, dtype=dt) mapping = {} for field in labels.dtype.names: if labels[field].dtype == "O": mapping.update( { index.decode(): label.decode() for index, label in zip( arr[field], labels[field] ) } ) kwargs["object_codec"] = FillStringsCodec( dtype=str(dt), id_map=mapping ) dt = [ ( v, ( "O" if h5obj.dtype[v].kind == "O" else str(h5obj.dtype[v]) ), ) for v in h5obj.dtype.names ] elif self.vlen == "null": dt = [ ( v, ( "S16" if h5obj.dtype[v].kind == "O" else str(h5obj.dtype[v]) ), ) for v in h5obj.dtype.names ] kwargs["object_codec"] = FillStringsCodec(dtype=str(dt)) dt = [ ( v, ( "O" if h5obj.dtype[v].kind == "O" else str(h5obj.dtype[v]) ), ) for v in h5obj.dtype.names ] elif self.vlen == "leave": dt = [ ( v, ( "S16" if h5obj.dtype[v].kind == "O" else h5obj.dtype[v] ), ) for v in h5obj.dtype.names ] elif self.vlen == "embed": # embed fails due to https://github.com/zarr-developers/numcodecs/issues/333 data = h5obj[:].tolist() data2 = [] for d in data: data2.append( [ ( _.decode(errors="ignore") if isinstance(_, bytes) else _ ) for _ in d ] ) dt = "O" kwargs["data"] = data2 kwargs["object_codec"] = numcodecs.JSON() fill = None else: raise NotImplementedError if h5py.h5ds.is_scale(h5obj.id) and not cinfo: return if h5obj.attrs.get("_FillValue") is not None: fill = encode_fill_value( h5obj.attrs.get("_FillValue"), dt or h5obj.dtype ) # Create a Zarr array equivalent to this HDF5 dataset... za = self._zroot.create_dataset( h5obj.name, shape=h5obj.shape, dtype=dt or h5obj.dtype, chunks=h5obj.chunks or False, fill_value=fill, compression=None, filters=filters, overwrite=True, **kwargs, ) lggr.debug(f"Created Zarr array: {za}") self._transfer_attrs(h5obj, za) adims = self._get_array_dims(h5obj) za.attrs["_ARRAY_DIMENSIONS"] = adims lggr.debug(f"_ARRAY_DIMENSIONS = {adims}") if "data" in kwargs: return # embedded bytes, no chunks to copy # Store chunk location metadata... if cinfo: for k, v in cinfo.items(): if h5obj.fletcher32: logging.info("Discarding fletcher32 checksum") v["size"] -= 4 if ( self.inline and isinstance(v, dict) and v["size"] < self.inline ): self.input_file.seek(v["offset"]) data = self.input_file.read(v["size"]) try: # easiest way to test if data is ascii data.decode("ascii") except UnicodeDecodeError: data = b"base64:" + base64.b64encode(data) self.store[za._chunk_key(k)] = data else: self.store[za._chunk_key(k)] = [ self._uri, v["offset"], v["size"], ] elif isinstance(h5obj, h5py.Group): lggr.debug(f"HDF5 group: {h5obj.name}") zgrp = self._zroot.create_group(h5obj.name) self._transfer_attrs(h5obj, zgrp) except Exception as e: import traceback msg = "\n".join( [ "The following excepion was caught and quashed while traversing HDF5", str(e), traceback.format_exc(limit=5), ] ) if self.error == "ignore": return elif self.error == "pdb": print(msg) import pdb pdb.post_mortem() elif self.error == "raise": raise else: # "warn" or anything else, the default import warnings warnings.warn(msg) del e # garbage collect def _get_array_dims(self, dset): """Get a list of dimension scale names attached to input HDF5 dataset. This is required by the xarray package to work with Zarr arrays. Only one dimension scale per dataset dimension is allowed. If dataset is dimension scale, it will be considered as the dimension to itself. Parameters ---------- dset : h5py.Dataset HDF5 dataset. Returns ------- list List with HDF5 path names of dimension scales attached to input dataset. """ dims = list() rank = len(dset.shape) if rank: for n in range(rank): num_scales = len(dset.dims[n]) if num_scales == 1: dims.append(dset.dims[n][0].name[1:]) elif h5py.h5ds.is_scale(dset.id): dims.append(dset.name[1:]) elif num_scales > 1: raise RuntimeError( f"{dset.name}: {len(dset.dims[n])} " f"dimension scales attached to dimension #{n}" ) elif num_scales == 0: # Some HDF5 files do not have dimension scales. # If this is the case, `num_scales` will be 0. # In this case, we mimic netCDF4 and assign phony dimension names. # See https://github.com/fsspec/kerchunk/issues/41 dims.append(f"phony_dim_{n}") return dims def _storage_info(self, dset: h5py.Dataset) -> dict: """Get storage information of an HDF5 dataset in the HDF5 file. Storage information consists of file offset and size (length) for every chunk of the HDF5 dataset. Parameters ---------- dset : h5py.Dataset HDF5 dataset for which to collect storage information. Returns ------- dict HDF5 dataset storage information. Dict keys are chunk array offsets as tuples. Dict values are pairs with chunk file offset and size integers. """ # Empty (null) dataset... if dset.shape is None: return dict() dsid = dset.id if dset.chunks is None: # Contiguous dataset... if dsid.get_offset() is None: # No data ever written... return dict() else: key = (0,) * (len(dset.shape) or 1) return { key: {"offset": dsid.get_offset(), "size": dsid.get_storage_size()} } else: # Chunked dataset... num_chunks = dsid.get_num_chunks() if num_chunks == 0: # No data ever written... return dict() # Go over all the dataset chunks... stinfo = dict() chunk_size = dset.chunks def get_key(blob): return tuple([a // b for a, b in zip(blob.chunk_offset, chunk_size)]) def store_chunk_info(blob): stinfo[get_key(blob)] = {"offset": blob.byte_offset, "size": blob.size} has_chunk_iter = callable(getattr(dsid, "chunk_iter", None)) if has_chunk_iter: dsid.chunk_iter(store_chunk_info) else: for index in range(num_chunks): store_chunk_info(dsid.get_chunk_info(index)) return stinfo
def _simple_type(x): if isinstance(x, bytes): return x.decode() if isinstance(x, np.number): if x.dtype.kind == "i": return int(x) return float(x) return x def _read_block(open_file, offset, size): place = open_file.tell() open_file.seek(offset) data = open_file.read(size) open_file.seek(place) return data def _is_netcdf_datetime(dataset: h5py.Dataset): units = dataset.attrs.get("units") if isinstance(units, bytes): units = units.decode("utf-8") # This is the same heuristic used by xarray # https://github.com/pydata/xarray/blob/f8bae5974ee2c3f67346298da12621af4cae8cf8/xarray/coding/times.py#L670 return units and "since" in units def _is_netcdf_variable(dataset: h5py.Dataset): return any("_Netcdf4" in _ for _ in dataset.attrs)