Advanced Topics

Using Dask

Scanning and combining datasets can be computationally intensive and may require a lot of bandwidth for some data formats. Where the target data contains many input files, it makes sense to parallelise the job with dask and maybe distribute the workload on a cluster to get additional CPUs and network performance.

Simple parallel

The simplest case is for processing many individual files in parallel. Let’s say you have a list of input files; you will need to encapsulate the processing on each in a single function. In this mode, it is typical to save the single-file outputs to files, although returning them is OK too (especially if you mean to combine them immediately).

Here is an example for HDF5 files. The caller should make sure the storage options and any parameters needed for the transformer are in place.

import ujson, fsspec, dask

def process(url, outputfile, storage_options_in={}, storage_options_out={}):
    transformer = kerchunk.hdf.SingleHdf5ToZarr(url, **storage_options_in)
    refs = transformer.translate()
    with fsspec.open(outputfile, mode="wt", **storage_options_out) as f:
        ujson.dump(refs, f)

tasks = [dask.delayed(process)(u, o)
         for u, o in zip(infilenames, outfilenames)]
dask.compute(tasks)

Tree reduction

In some cases, the combine process can itself be slow or memory hungry. In such cases, it is useful to combine the single-file reference sets in batches (which reduce a lot of redundancy between them) and then combine the results of the batches. This technique is known as tree reduction. An example of doing this by hand can be seen here.

We also provide kerchunk.combine.auto_dask() as a convenience. This function is a one-stop call to process the individual inputs, combine them in batches, and then combine the results of those batches into a final combined references set.

The auto_dask function takes a number of dicts as arguments, and users should consult the docstrings of the specific class which decodes the input files, and also of kerchunk.combine.MultiZarrToZarr. Note that any “preprocessing” for MultiZarrToZarr will be performed before the batch stage, and any “postprocessing” only after the final combine.

Archive Files

It is often convenient to distribute datasets by wrapping multiple files into an archive, ZIP or TAR. If those files are of formats supported by kerchunk, they can be directly scanned with something like

transformer = kerchunk.netCDF3.NetCDF3ToZarr(
    "tar://myfile.nc::file://archive.tar",
    inline_threshold=0
)
out = transformer.translate()

where “myfile.nc” is a member file of the local archive.

Note

We have turned off inlining here (it can be done later with kerchunk.utils.do_inline(); support for this will come later.

At this point, the generated references will contain URLs “tar://myfile.nc::file://archive.tar”, which are problematic for loading, so we can transform them to point to ranges in the original tar file instead, and then transform back to nominal form, ready to use. We may automate these steps in the future.

out2 = kerchunk.utils.dereference_archives(out)
# optional out2 = kerchunk.utils.do_inline(out2, 100)
final = kerchunk.utils.consolidate(out2)

Now the references are all “file://archive.tar”, and the reference set can be used directly or in combining.

Warning

For ZIP archives, only uncompressed members can be accessed this way

Parquet Storage

JSON is very convenient as a storage format for references, because it is simple, human-readable and ubiquitously supported. However, it is not the most efficient in terms of storage size of parsing speed. For python, in particular, it comes with the added downside of repeated strings becoming separate python string instances, greatly inflating memory footprint at load time.

To overcome these problems, and in particular keep down the memory use for the end-user of kerchunked data, we can convert references to be stored in parquet, and use them with fsspec.implementations.reference.ReferenceFileSystem, an alternative new implementation designed to work only with parquet input.

The principle benefits of the parquet path are:

  • much more compact storage, typically 2x smaller than compressed JSON or 10x smaller than uncompressed

  • correspondingly faster instantiation of a filesystem, since much of that time is taken by loading in the bytes of the references

  • smaller in-memory size (e.g., a python int requires 28 bytes, but an int in an array needs 4 or 8.

  • optional lazy loading, by partitioning the references into files by key; only the variables you actually access need to have their references loaded

  • optional dictionary encoding of URLs in the case that there are may repeated URLs (many references per target file). In this format, each unique URL is only stored in memory once.

The only access point to the new parquet storage is kerchunk.df.refs_to_dataframe(), which transforms an existing kerchunk reference set (in memory as dicts or in a JSON file) to parquet. Careful reading of the docstring is recommended, to understand the options. More options may be added.

Note

For now, kerchunk.combine.MultiZarrToZarr only operates on JSON/dict input. Therefore, refs_to_dataframe can only be used on the final output reference set. For a very large merge of many/large inputs, this may mean that the combine step requires a lot of memory, as will converting the output to parquet. However, the end-user should be able to access data via these references with much smaller memory requirements.

A concrete workflow may be something like the following. Note that kerchunk.combine.auto_dask() can execute the first three stages in one go and may be faster, if you have a Dask cluster available.

from kerchunk import hdf, combine, df
import fsspec.implementations.reference
from fsspec.implementations.reference import LazyReferenceMapper
from tempfile import TemporaryDirectory

import xarray as xr

files = fsspec.open(location_of_data)

# Create LazyReferenceMapper to pass to MultiZarrToZarr
fs = fsspec.filesystem("file")

os.makedirs("combined.parq")
out = LazyReferenceMapper.create(record_size=1000, root="combined.parq", fs=fs)

# Create references from input files
single_ref_sets = [hdf.SingleHdf5ToZarr(_).translate() for _ in files]

out_dict = MultiZarrToZarr(
 single_ref_sets,
 remote_protocol="s3",
 concat_dims=["time"],
 remote_options={"anon": True},
 out=out
 ).translate()

out.flush()

df.refs_to_dataframe(out_dict, "combined.parq")

fs = fsspec.implementations.reference.ReferenceFileSystem(
    "combined.parq", remote_protocol="s3", target_protocol="file", lazy=True)
ds = xr.open_dataset(
    fs.get_mapper(), engine="zarr",
    backend_kwargs={"consolidated": False}
)

At this point, xarray has loaded the metadata and coordinates only, so the main reference files corresponding to the data variables have not been touched. Even for a very large reference set, the memory use at this point should be <500MB.

As you access the variables of ds, they will be loaded on demand and cached. If using dask, workers will also only load those references they need.