Source code for kerchunk.df

import base64
import logging

import ujson
import pandas as pd
import fsspec
import zarr


# example from preffs's README'
df = pd.DataFrame(
    {
        "key": ["a/b", "a/b", "b"],
        "path": ["a.dat", "b.dat", None],
        "offset": [123, 12, 0],
        "size": [12, 17, 0],
        "raw": [None, None, b"data"],
    }
)

logger = logging.getLogger("kerchunk.df")


def _proc_raw(r):
    if not isinstance(r, bytes):
        r = r.encode()
    if r.startswith(b"base64:"):
        return base64.b64decode(r[7:])
    return r


def get_variables(refs, consolidated=True):
    """Get list of variable names from references.

    Finds the top-level prefixes in a reference set, corresponding to
    the directory listing of the root for zarr.

    Parameters
    ----------
    refs : dict
        kerchunk references keys
    consolidated : bool
        Whether or not to add consolidated metadata key to references. (default True)

    Returns
    -------
    fields : list of str
        List of variable names.
    """
    fields = []
    meta = {}
    for k in refs:
        if ".z" in k and consolidated:
            meta[k] = refs[k]
        if "/" in k:
            name, chunk = k.split("/")
            if name not in fields:
                fields.append(name)
            else:
                continue
        else:
            fields.append(k)
    if consolidated and ".zmetadata" not in fields:
        zarr.consolidate_metadata(meta)
        refs[".zmetadata"] = ujson.loads(meta[".zmetadata"])
        fields.append(".zmetadata")
    return fields


def _normalize_json(json_obj):
    """Normalize json representation as bytes

    Parameters
    ----------
    json_obj : str, bytes, dict, list
        JSON data for parquet file to be written.
    """
    if not isinstance(json_obj, str) and not isinstance(json_obj, bytes):
        json_obj = ujson.dumps(json_obj)
    if not isinstance(json_obj, bytes):
        json_obj = json_obj.encode()
    return json_obj


def _write_json(fname, json_obj, storage_options=None):
    """Write references into a parquet file.

    Parameters
    ----------
    fname : str
        Output filename.
    json_obj : str, bytes, dict, list
        JSON data for parquet file to be written.
    """
    storage_options = {} if storage_options is None else storage_options
    json_obj = _normalize_json(json_obj)
    with fsspec.open(fname, "wb", **storage_options) as f:
        f.write(json_obj)


[docs] def refs_to_dataframe( fo, url, target_protocol=None, target_options=None, storage_options=None, record_size=100_000, categorical_threshold=10, ): """Write references as a parquet files store. The directory structure should mimic a normal zarr store but instead of standard chunk keys, references are saved as parquet dataframes. Parameters ---------- fo : str | dict Location of a JSON file containing references or a reference set already loaded into memory. url : str Location for the output, together with protocol. This must be a writable directory. target_protocol : str Used for loading the reference file, if it is a path. If None, protocol will be derived from the given path target_options : dict Extra FS options for loading the reference file ``fo``, if given as a path storage_options: dict | None Passed to fsspec when for writing the parquet. record_size : int Number of references to store in each reference file (default 10000). Bigger values mean fewer read requests but larger memory footprint. categorical_threshold : int Encode urls as pandas.Categorical to reduce memory footprint if the ratio of the number of unique urls to total number of refs for each variable is greater than or equal to this number. (default 10) """ from fsspec.implementations.reference import LazyReferenceMapper if isinstance(fo, str): # JSON file dic = dict(**(target_options or {}), protocol=target_protocol) with fsspec.open(fo, "rb", **dic) as f: logger.info("Read reference from URL %s", fo) refs = ujson.load(f) else: # Mapping object refs = fo if "refs" in refs: refs = refs["refs"] fs, _ = fsspec.core.url_to_fs(url, **(storage_options or {})) out = LazyReferenceMapper.create( record_size=record_size, root=url, fs=fs, categorical_threshold=categorical_threshold, ) for k in sorted(refs): out[k] = refs[k] out.flush()