Source code for kerchunk.utils

import base64
import copy
import itertools
import warnings

import ujson

import fsspec
import zarr


def class_factory(func):
    """Experimental uniform API across function-based file scanners"""

    class FunctionWrapper:
        __doc__ = func.__doc__
        __module__ = func.__module__

        def __init__(self, url, storage_options=None, inline_threshold=100, **kwargs):
            self.url = url
            self.storage_options = storage_options
            self.inline = inline_threshold
            self.kwargs = kwargs

        def translate(self):
            return func(
                self.url,
                inline_threshold=self.inline,
                storage_options=self.storage_options,
                **self.kwargs,
            )

        def __str__(self):
            return f"<Single file to zarr processor using {func.__module__}.{func.__qualname__}>"

        __repr__ = __str__

    return FunctionWrapper


[docs]def consolidate(refs): """Turn raw references into output""" out = {} for k, v in refs.items(): if isinstance(v, bytes): try: # easiest way to test if data is ascii out[k] = v.decode("ascii") except UnicodeDecodeError: out[k] = (b"base64:" + base64.b64encode(v)).decode() else: out[k] = v return {"version": 1, "refs": out}
[docs]def rename_target(refs, renames): """Utility to change URLs in a reference set in a predictable way For reference sets including templates, this is more easily done by using template overrides at access time; but rewriting the references and saving a new file means not having to do that every time. Parameters ---------- refs: dict Reference set renames: dict[str, str] Mapping from the old URL (including protocol, if this is how they appear in the original) to new URL Returns ------- dict: the altered reference set, which can be saved """ fs = fsspec.filesystem("reference", fo=refs) # to produce normalised refs refs = fs.references out = {} for k, v in refs.items(): if isinstance(v, list) and v[0] in renames: out[k] = [renames[v[0]]] + v[1:] else: out[k] = v return consolidate(out)
[docs]def rename_target_files( url_in, renames, url_out=None, storage_options_in=None, storage_options_out=None ): """Perform URL renames on a reference set - read and write from JSON Parameters ---------- url_in: str Original JSON reference set renames: dict URL renamings to perform (see ``renate_target``) url_out: str | None Where to write to. If None, overwrites original storage_options_in: dict | None passed to fsspec for opening url_in storage_options_out: dict | None passed to fsspec for opening url_out. If None, storage_options_in is used. Returns ------- None """ with fsspec.open(url_in, **(storage_options_in or {})) as f: old = ujson.load(f) new = rename_target(old, renames) if url_out is None: url_out = url_in if storage_options_out is None: storage_options_out = storage_options_in with fsspec.open(url_out, mode="wt", **(storage_options_out or {})) as f: ujson.dump(new, f)
def _encode_for_JSON(store): """Make store JSON encodable""" for k, v in store.copy().items(): if isinstance(v, list): continue else: try: # minify JSON v = ujson.dumps(ujson.loads(v)) except (ValueError, TypeError): pass try: store[k] = v.decode() if isinstance(v, bytes) else v except UnicodeDecodeError: store[k] = "base64:" + base64.b64encode(v).decode() return store
[docs]def do_inline(store, threshold, remote_options=None, remote_protocol=None): """Replace short chunks with the value of that chunk The chunk may need encoding with base64 if not ascii, so actual length may be larger than threshold. """ fs = fsspec.filesystem( "reference", fo=store, remote_options=remote_options, remote_protocol=remote_protocol, ) out = fs.references.copy() get_keys = [ k for k, v in out.items() if isinstance(v, list) and len(v) == 3 and v[2] < threshold ] values = fs.cat(get_keys) for k, v in values.items(): try: # easiest way to test if data is ascii v.decode("ascii") except UnicodeDecodeError: v = b"base64:" + base64.b64encode(v) out[k] = v return out
def _inline_array(group, threshold, names, prefix=""): for name, thing in group.items(): if prefix: prefix1 = f"{prefix}.{name}" else: prefix1 = name if isinstance(thing, zarr.Group): _inline_array(thing, threshold=threshold, prefix=prefix1, names=names) else: cond1 = threshold and thing.nbytes < threshold and thing.nchunks > 1 cond2 = prefix1 in names if cond1 or cond2: original_attrs = dict(thing.attrs) arr = group.create_dataset( name=name, dtype=thing.dtype, shape=thing.shape, data=thing[:], chunks=thing.shape, compression=None, overwrite=True, ) arr.attrs.update(original_attrs)
[docs]def inline_array(store, threshold=1000, names=None, remote_options=None): """Inline whole arrays by threshold or name, repace with a single metadata chunk Inlining whole arrays results in fewer keys. If the constituent keys were already inlined, this also results in a smaller file overall. No action is taken for arrays that are already of one chunk (they should be in Parameters ---------- store: dict/JSON file reference set threshold: int Size in bytes below which to inline. Set to 0 to prevent inlining by size names: list[str] | None It the array name (as a dotted full path) appears in this list, it will be inlined irrespective of the threshold size. Useful for coordinates. remote_options: dict | None Needed to fetch data, if the required keys are not already individually inlined in the data. Returns ------- amended references set (simple style) """ fs = fsspec.filesystem( "reference", fo=store, **(remote_options or {}), skip_instance_cache=True ) g = zarr.open_group(fs.get_mapper(), mode="r+") _inline_array(g, threshold, names=names or []) return fs.references
[docs]def subchunk(store, variable, factor): """ Split uncompressed chunks into integer subchunks on the largest axis Parameters ---------- store: dict reference set variable: str the named zarr variable (give as /-separated path if deep) factor: int the number of chunks each input chunk turns into. Must be an exact divisor of the original largest dimension length. Returns ------- modified store """ fs = fsspec.filesystem("reference", fo=store) store = copy.deepcopy(store) meta_file = f"{variable}/.zarray" meta = ujson.loads(fs.cat(meta_file)) if meta["compressor"] is not None: raise ValueError("Can only subchunk an uncompressed array") chunks_orig = meta["chunks"] if chunks_orig[0] % factor == 0: chunk_new = [chunks_orig[0] // factor] + chunks_orig[1:] else: raise ValueError("Must subchunk by exact integer factor") meta["chunks"] = chunk_new store[meta_file] = ujson.dumps(meta) for k, v in store.copy().items(): if k.startswith(f"{variable}/"): kpart = k[len(variable) + 1 :] if kpart.startswith(".z"): continue sep = "." if "." in k else "/" chunk_index = [int(_) for _ in kpart.split(sep)] if len(v) > 1: url, offset, size = v else: (url,) = v offset = 0 size = fs.size(k) for subpart in range(factor): new_index = [chunk_index[0] * factor + subpart] + chunk_index[1:] newpart = sep.join(str(_) for _ in new_index) newv = [url, offset + subpart * size // factor, size // factor] store[f"{variable}/{newpart}"] = newv return store
[docs]def dereference_archives(references, remote_options=None): """Directly point to uncompressed byte ranges in ZIP/TAR archives If a set of references have been made for files contained within ZIP or (uncompressed) TAR archives, the "zip://..." and "tar://..." URLs should be converted to byte ranges in the overall file. Parameters ---------- references: dict a simple reference set remote_options: dict or None For opening the archives """ import zipfile import tarfile if "version" in references and references["version"] == 1: references = references["refs"] target_files = [l[0] for l in references.values() if isinstance(l, list)] target_files = { (t.split("::", 1)[1], t[:3]) for t in target_files if t.startswith(("tar://", "zip://")) } # find all member file offsets in all archives offsets = {} for target, tar_or_zip in target_files: with fsspec.open(target, **(remote_options or {})) as tf: if tar_or_zip == "tar": tar = tarfile.TarFile(fileobj=tf) offsets[target] = { ti.name: {"offset": ti.offset_data, "size": ti.size, "comp": False} for ti in tar.getmembers() if ti.isfile() } elif tar_or_zip == "zip": zf = zipfile.ZipFile(file=tf) offsets[target] = {} for zipinfo in zf.filelist: if zipinfo.is_dir(): continue # if uncompressed, include only the buffer. In compressed (DEFLATE), include # also the header, and must use DeflateCodec if zipinfo.compress_type == zipfile.ZIP_DEFLATED: # TODO: find relevant .zarray and add filter directly header = 0 warnings.warn( "ZIP file contains compressed files, must use DeflateCodec" ) tail = len(zipinfo.FileHeader()) elif zipinfo.compress_type == zipfile.ZIP_STORED: header = len(zipinfo.FileHeader()) tail = 0 else: comp = zipfile.compressor_names[zipinfo.compress_type] raise ValueError( f"ZIP compression method not supported: {comp}" ) offsets[target][zipinfo.filename] = { "offset": zipinfo.header_offset + header, "size": zipinfo.compress_size + tail, "comp": zipinfo.compress_type != zipfile.ZIP_STORED, } # modify references mods = copy.deepcopy(references) for k, v in mods.items(): if not isinstance(v, list): continue target = v[0].split("::", 1)[1] infile = v[0].split("::", 1)[0][6:] # strip "zip://" or "tar://" if target not in offsets: continue detail = offsets[target][infile] if detail["comp"]: # leave compressed member file alone pass v[0] = target if len(v) == 1: v.append(detail["offset"]) v.append(detail["size"]) else: v[1] += detail["offset"] return mods
def _max_prefix(*strings): # https://stackoverflow.com/a/6719272/3821154 def all_same(x): return all(x[0] == y for y in x) char_tuples = zip(*strings) prefix_tuples = itertools.takewhile(all_same, char_tuples) return "".join(x[0] for x in prefix_tuples) def templateize(strings, min_length=10, template_name="u"): """Make prefix template for a set of strings Useful for condensing strings by extracting out a common prefix. If the common prefix is shorted than ``min_length``, the original strings are returned and the output templates are empty. Parameters ---------- strings: List[str] inputs min_length: int Only perform transformm if the common prefix is at least this long. template_name: str The placeholder string, should be short. Returns ------- templates: Dict[str, str], strings: List[str] Such that [s.format(**templates) for s in strings] recreates original strings list """ prefix = _max_prefix(*strings) lpref = len(prefix) if lpref >= min_length: template = {template_name: prefix} strings = [("{%s}" % template_name) + s[lpref:] for s in strings] else: template = {} return template, strings