Tutorial
This run-through tutorial is intended to display a variety of methods for combining datasets using the kerchunk.combine.MultiZarrtoZarr
API.
You can find additional tutorial materials at the project pythia kerchunk cookbook.
Initially we create a pair of single file jsons for two ERA5 variables using Kerchunk.hdf.SingleHdf5ToZarr
. This ERA5 dataset is free to access and so it is possible to replicate this workflow on a local machine without credentials.
Single file JSONs
The Kerchunk.hdf.SingleHdf5ToZarr
method is used to create a single .json
reference file for each file url passed to it. Here we use it to create a number of reference files for the ERA5 public dataset on AWS. We will compute a number of different times and variables to demonstrate different methods of combining them.
The Kerchunk package is still in a development phase and so changes frequently. Installing directly from the source code is recommended.
!pip install git+https://github.com/fsspec/kerchunk
Here we are considering Netcdf4 files and so use the kerchunk hdf
module. Support for fits
, grib2
, tiff
, netCDF3
and zarr
are available in other kerchunk modules. Alternatively it is also possible to manually create reference jsons for more specific cases. The Earth Big Data example provides a demonstration of this.
from kerchunk.hdf import SingleHdf5ToZarr
import fsspec
Using fsspec to create a pythonic filesystem, provides a convenient way to manage file urls.
The SingleHdf5ToZarr
method takes both an h5f
file and a url
as input. The h5f
file can either be a binary Python file-like object or a url, in which case it will be opened using fsspec
and storage_options
. The url
input is not used to open the file and is intended to allow the user to compute the reference files on data before it is uploaded to its final storage location. Thus the url
input should be the url of the final file destination and not the current location.
fs = fsspec.filesystem('s3', anon=True) #S3 file system to manage ERA5 files
flist = (fs.glob('s3://era5-pds/2020/*/data/air_pressure_at_mean_sea_level.nc')[:2]
+ fs.glob('s3://era5-pds/2020/*/data/*sea_surface_temperature.nc')[:2])
fs2 = fsspec.filesystem('') #local file system to save final jsons to
from pathlib import Path
import os
import ujson
so = dict(mode='rb', anon=True, default_fill_cache=False, default_cache_type='first') # args to fs.open()
# default_fill_cache=False avoids caching data in between file chunks to lowers memory usage.
def gen_json(file_url):
with fs.open(file_url, **so) as infile:
h5chunks = SingleHdf5ToZarr(infile, file_url, inline_threshold=300)
# inline threshold adjusts the Size below which binary blocks are included directly in the output
# a higher inline threshold can result in a larger json file but faster loading time
variable = file_url.split('/')[-1].split('.')[0]
month = file_url.split('/')[2]
outf = f'{month}_{variable}.json' #file name to save json to
with fs2.open(outf, 'wb') as f:
f.write(ujson.dumps(h5chunks.translate()).encode());
ERA5-pds is located in us-west-2 and so depending on where this computation is taking place the time taken can vary dramatically.
%%time
for file in flist:
gen_json(file)
CPU times: user 30.4 s, sys: 4.74 s, total: 35.1 s
Wall time: 14min 44s
The .json
reference files we have generated can now be used to open virtual datasets through xarray or zarr. It is necessary to specify location of the reference json
files, using the target_options
argument, and the source data using the remote_options
and remote_protocol
arguments. Here specifying that the source data is stored on AWS S3
and can be accessed anonymously.
import xarray as xr
%%time
ds = xr.open_dataset("reference://", engine="zarr", backend_kwargs={
"consolidated": False,
"storage_options": {"fo": '01_air_pressure_at_mean_sea_level.json', "remote_protocol": "s3","remote_options": {"anon": True}}
})
print(ds)
<xarray.Dataset>
Dimensions: (time0: 744, lat: 721, lon: 1440)
Coordinates:
* lat (lat) float32 90.0 89.75 ... -89.75 -90.0
* lon (lon) float32 nan 0.25 0.5 ... 359.5 359.8
* time0 (time0) datetime64[ns] 2020-01-01 ... 202...
Data variables:
air_pressure_at_mean_sea_level (time0, lat, lon) float32 dask.array<chunksize=(24, 100, 100), meta=np.ndarray>
Attributes:
institution: ECMWF
source: Reanalysis
title: ERA5 forecasts
CPU times: user 162 ms, sys: 17.4 ms, total: 180 ms
Wall time: 235 ms
Combine multiple kerchunked datasets into a single logical aggregate dataset
The Kerchunk.combine.MultiZarrtoZarr
method combines the .json
reference files generated above to create a single virtual dataset, such that one reference file maps to all of the chunks in the individual files.
from kerchunk.combine import MultiZarrToZarr
MultiZarrtoZarr provides a number of convenience methods to combine reference files. The simplest is to concatenate along a specified dimension using the concat_dims
argument, "Time0"
in this instance. Variables that should not be concatenated (because they are not a function of the concat coordinates) should be listed in the identical_dims
argument.
json_list = fs2.glob("*_air_pressure_at_mean_sea_level.json")
mzz = MultiZarrToZarr(json_list,
remote_protocol='s3',
remote_options={'anon':True},
concat_dims=['time0'],
identical_dims = ['lat', 'lon'])
d = mzz.translate()
with fs2.open('air_pressure_at_mean_sea_level_combined.json', 'wb') as f:
f.write(ujson.dumps(d).encode())
The reference json we have just generated can now be opened to reveal a single virtual dataset spanning both the input files, with little to no latency.
%%time
backend_args = {"consolidated": False, "storage_options": {"fo": d, "remote_protocol": "s3","remote_options": {"anon": True}}}
print(xr.open_dataset("reference://", engine="zarr", backend_kwargs=backend_args))
<xarray.Dataset>
Dimensions: (time0: 1440, lat: 721, lon: 1440)
Coordinates:
* lat (lat) float32 90.0 89.75 ... -89.75 -90.0
* lon (lon) float32 nan 0.25 0.5 ... 359.5 359.8
* time0 (time0) datetime64[ns] 2020-01-01 ... 202...
Data variables:
air_pressure_at_mean_sea_level (time0, lat, lon) float32 ...
Attributes:
institution: ECMWF
source: Reanalysis
title: ERA5 forecasts
CPU times: user 42.3 ms, sys: 128 µs, total: 42.5 ms
Wall time: 40.9 ms
Using coo_map
When the dimension along which we would like to concatenate is not already in the dataset, or when considering datasets from across an ensemble we can use the coo_map
argument to create a new dimension.
new_dims = ['a' , 'b']
mzz = MultiZarrToZarr(json_list,
remote_protocol='s3',
remote_options={'anon':True},
coo_map = {'new_dimension':new_dims},
concat_dims=['new_dimension'],
identical_dims = ['lat', 'lon']
)
d = mzz.translate()
backend_args = {"consolidated": False, "storage_options": {"fo": d, "remote_protocol": "s3","remote_options": {"anon": True}}}
print(xr.open_dataset("reference://", engine="zarr", backend_kwargs=backend_args))
<xarray.Dataset>
Dimensions: (new_dimension: 2, time0: 744, lat: 721,
lon: 1440)
Coordinates:
* lat (lat) float32 90.0 89.75 ... -89.75 -90.0
* lon (lon) float32 nan 0.25 0.5 ... 359.5 359.8
* new_dimension (new_dimension) object 'a' 'b'
* time0 (time0) datetime64[ns] 2020-01-01 ... 202...
Data variables:
air_pressure_at_mean_sea_level (new_dimension, time0, lat, lon) float32 ...
Attributes:
institution: ECMWF
source: Reanalysis
title: ERA5 forecasts
Here by providing a list of literal values to coo_map
we created new_dimension
.
For more complex uses it is also possible to pass in a compiled regex
function which operates on the input file urls to generate a unique variable for each file.
import re
ex = re.compile(r'.*(\d+)_air')
print(ex.match(json_list[0]).groups()[0])
'1'
mzz = MultiZarrToZarr(json_list,
remote_protocol='s3',
remote_options={'anon':True},
coo_map = {'new_dimension':ex},
concat_dims=['new_dimension'],
identical_dims = ['lat', 'lon']
)
d = mzz.translate()
backend_args = {"consolidated": False, "storage_options": {"fo": d, "remote_protocol": "s3","remote_options": {"anon": True}}}
print(xr.open_dataset("reference://", engine="zarr", backend_kwargs=backend_args))
<xarray.Dataset>
Dimensions: (new_dimension: 2, time0: 744, lat: 721,
lon: 1440)
Coordinates:
* lat (lat) float32 90.0 89.75 ... -89.75 -90.0
* lon (lon) float32 nan 0.25 0.5 ... 359.5 359.8
* new_dimension (new_dimension) object '1' '2'
* time0 (time0) datetime64[ns] 2020-01-01 ... 202...
Data variables:
air_pressure_at_mean_sea_level (new_dimension, time0, lat, lon) float32 ...
Attributes:
institution: ECMWF
source: Reanalysis
title: ERA5 forecasts
Here the new_dimension
values have been populated by the compiled regex
function ex
which takes the file urls as input.
To extract time information from file names, a custom function can be defined of the form (index, fs, var, fn) -> value
to generate a valid datetime.datetime
data type, typically using regular expressions. These datetime objects are then used to generate time coordinates through the coo_dtypes
argument in the MultiZarrToZarr
function.
Here’s an example for file names following the pattern cgl_TOC_YYYYmmddHHMM_X21Y05_S3A_v1.1.0.json
:
def fn_to_time(index, fs, var, fn):
import re
import datetime
subst = re.search(r"\d{12}", fn)[0]
return datetime.datetime.strptime(subst, '%Y%m%d%H%M')
mzz = MultiZarrToZarr(sorted(glob.iglob(r'*.json')),
remote_protocol='file',
coo_map={'time': fn_to_time},
coo_dtypes={'time': np.dtype('M8[s]')},
concat_dims=['time'],
identical_dims=['lat', 'lon'],
)
<xarray.Dataset>
Dimensions: (time: 4, lat: 3360, lon: 3360)
Coordinates:
* lat (lat) float64 35.0 35.0 34.99 ... 25.01 25.01 25.0
* lon (lon) float64 30.0 30.0 30.01 ... 39.99 39.99 40.0
* time (time) datetime64[s] 2022-10-11T07:01:00 ... 2022-10...
Data variables: (12/53)
AC_process_flag (time, lat, lon) float32 dask.array<chunksize=(1, 168, 168), meta=np.ndarray>
Oa02_toc (time, lat, lon) float32 dask.array<chunksize=(1, 168, 168), meta=np.ndarray>
... ...
VZA_olci (time, lat, lon) float32 dask.array<chunksize=(1, 168, 168), meta=np.ndarray>
Attributes: (12/17)
Conventions: CF-1.6
archive_facility: vito
copyright: Copernicus Service information 2022
Similarly we can map each file to a new variable using the special var
key in coo_map. Here we use the same regex
function but instead map these as new variables.
mzz = MultiZarrToZarr(json_list,
remote_protocol='s3',
remote_options={'anon':True},
coo_map = {"var":ex},
concat_dims=['time0'],
identical_dims = ['lat', 'lon']
)
d = mzz.translate()
backend_args = {"consolidated": False, "storage_options": {"fo": d, "remote_protocol": "s3","remote_options": {"anon": True}}}
print(xr.open_dataset("reference://", engine="zarr", backend_kwargs=backend_args))
<xarray.Dataset>
Dimensions: (time0: 1440, lat: 721, lon: 1440)
Coordinates:
* lat (lat) float32 90.0 89.75 89.5 89.25 ... -89.25 -89.5 -89.75 -90.0
* lon (lon) float32 nan 0.25 0.5 0.75 1.0 ... 359.0 359.2 359.5 359.8
* time0 (time0) datetime64[ns] 2020-01-01 ... 2020-02-29T23:00:00
Data variables:
1 (time0, lat, lon) float32 ...
2 (time0, lat, lon) float32 ...
Attributes:
institution: ECMWF
source: Reanalysis
title: ERA5 forecasts
Another special key in coo_map
is attr:
. This allows the user to access values from each dataset’s global attributes.
mzz = MultiZarrToZarr(json_list,
remote_protocol='s3',
remote_options={'anon':True},
coo_map = {"var":"attr:institution"},
concat_dims=['time0'],
identical_dims = ['lat', 'lon']
)
d = mzz.translate()
backend_args = {"consolidated": False, "storage_options": {"fo": d, "remote_protocol": "s3","remote_options": {"anon": True}}}
print(xr.open_dataset("reference://", engine="zarr", backend_kwargs=backend_args))
<xarray.Dataset>
Dimensions: (time0: 1440, lat: 721, lon: 1440)
Coordinates:
* lat (lat) float32 90.0 89.75 89.5 89.25 ... -89.25 -89.5 -89.75 -90.0
* lon (lon) float32 nan 0.25 0.5 0.75 1.0 ... 359.0 359.2 359.5 359.8
* time0 (time0) datetime64[ns] 2020-01-01 ... 2020-02-29T23:00:00
Data variables:
ECMWF (time0, lat, lon) float32 ...
Attributes:
institution: ECMWF
source: Reanalysis
title: ERA5 forecasts
The special value vattr:{var}:{attr}
allows access to variable attributes. Here renaming the variable to instead use its short name.
mzz = MultiZarrToZarr(json_list,
remote_protocol='s3',
remote_options={'anon':True},
coo_map = {"var":"vattr:air_pressure_at_mean_sea_level:shortNameECMWF"},
concat_dims=['time0'],
identical_dims = ['lat', 'lon']
)
d = mzz.translate()
backend_args = {"consolidated": False, "storage_options": {"fo": d, "remote_protocol": "s3","remote_options": {"anon": True}}}
print(xr.open_dataset("reference://", engine="zarr", backend_kwargs=backend_args))
<xarray.Dataset>
Dimensions: (lat: 721, lon: 1440, time0: 1440)
Coordinates:
* lat (lat) float32 90.0 89.75 89.5 89.25 ... -89.25 -89.5 -89.75 -90.0
* lon (lon) float32 nan 0.25 0.5 0.75 1.0 ... 359.0 359.2 359.5 359.8
* time0 (time0) datetime64[ns] 2020-01-01 ... 2020-02-29T23:00:00
Data variables:
msl (time0, lat, lon) float32 ...
Attributes:
institution: ECMWF
source: Reanalysis
title: ERA5 forecasts
There are a number of other special characters for coo_map
documented in the API reference
Merging variables across jsons
The Kerchunk.combine.merge_vars
convenience function can be used to merge variables across datasets if we know the coordinates and global file attributes are identical.
from kerchunk.combine import merge_vars
json_list = fs2.glob("01_sea_surface_temperature.json") + fs2.glob("01_air_pressure_at_mean_sea_level.json")
d = merge_vars(json_list)
backend_args = {"consolidated": False, "storage_options": {"fo": d, "remote_protocol": "s3","remote_options": {"anon": True}}}
print(xr.open_dataset("reference://", engine="zarr", backend_kwargs=backend_args))
<xarray.Dataset>
Dimensions: (time0: 744, lat: 721, lon: 1440)
Coordinates:
* lat (lat) float32 90.0 89.75 ... -89.75 -90.0
* lon (lon) float32 nan 0.25 0.5 ... 359.5 359.8
* time0 (time0) datetime64[ns] 2020-01-01 ... 202...
Data variables:
air_pressure_at_mean_sea_level (time0, lat, lon) float32 ...
sea_surface_temperature (time0, lat, lon) float32 ...
Attributes:
institution: ECMWF
source: Reanalysis
title: ERA5 forecasts
Preprocessing
Pre-process can be used to apply arbitrary functions to the refs item in the input jsons before combining. In this case we use preprocessing to drop the air_pressure_at_mean_sea_level
variable before combining sea_surface_temperature
with a json containing data for the following month.
def pre_process(refs):
for k in list(refs):
if k.startswith('air_pressure_at_mean_sea_level'):
refs.pop(k)
return refs
json_list = fs2.glob("vars_combined.json") + fs2.glob("02_sea_surface_temperature.json")
mzz = MultiZarrToZarr(json_list,
remote_protocol='s3',
remote_options={'anon':True},
concat_dims=['time0'],
identical_dims = ['lat', 'lon'],
preprocess = pre_process)
d = mzz.translate()
with fs2.open('sea_surface_temperature_combined.json', 'wb') as f:
f.write(ujson.dumps(d).encode())
backend_args = {"consolidated": False, "storage_options": {"fo": d, "remote_protocol": "s3","remote_options": {"anon": True}}}
print(xr.open_dataset("reference://", engine="zarr", backend_kwargs=backend_args))
<xarray.Dataset>
Dimensions: (lat: 721, lon: 1440, time0: 696)
Coordinates:
* lat (lat) float32 90.0 89.75 89.5 ... -89.75 -90.0
* lon (lon) float32 nan 0.25 0.5 ... 359.2 359.5 359.8
* time0 (time0) datetime64[ns] 2020-02-01 ... 2020-02-29...
Data variables:
sea_surface_temperature (time0, lat, lon) float32 ...
Attributes:
institution: ECMWF
source: Reanalysis
title: ERA5 forecasts
Postprocessing
Similarly post-process can be used to apply an arbitrary function to the final dictionary before returning. A known issue with this particular dataset is that no fill value has been assigned to the lat and lon coordinates and thus default to 0, here we use post process to change the zarr fill_value attribute by opening the final json as a zarr store.
Changing the fill_values could also be achieved by editing the final json through string manipulations or even a simple find and replace through an IDE.
import zarr
def modify_fill_value(out):
out_ = zarr.open(out)
out_.lon.fill_value = -999
out_.lat.fill_value = -999
return out
def postprocess(out):
out = modify_fill_value(out)
return out
json_list = fs2.glob("air_pressure_at_mean_sea_level_combined.json") + fs2.glob("sea_surface_temperature_combined.json")
mzz = MultiZarrToZarr(json_list,
remote_protocol='s3',
remote_options={'anon':True},
concat_dims=['time0'],
identical_dims = ['lat', 'lon'],
postprocess = postprocess)
d = mzz.translate()
with fs2.open('combined.json', 'wb') as f:
f.write(ujson.dumps(d).encode())
backend_args = {"consolidated": False, "storage_options": {"fo": d, "remote_protocol": "s3","remote_options": {"anon": True}}}
print(xr.open_dataset("reference://", engine="zarr", backend_kwargs=backend_args))
<xarray.Dataset>
Dimensions: (time0: 1440, lat: 721, lon: 1440)
Coordinates:
* lat (lat) float32 90.0 89.75 ... -89.75 -90.0
* lon (lon) float32 0.0 0.25 0.5 ... 359.5 359.8
* time0 (time0) datetime64[ns] 2020-01-01 ... 202...
Data variables:
air_pressure_at_mean_sea_level (time0, lat, lon) float32 ...
sea_surface_temperature (time0, lat, lon) float32 ...
Attributes:
institution: ECMWF
source: Reanalysis
title: ERA5 forecasts
Using the output
To open a previously computed referenced dataset it is not necessary to have kerchunk installed. Only fsspec
to generate the file mapping.
Here we open a remotely stored reference file that maps to 10 ERA5 variables across a 43 year time span.
The sidecar file has been compressed using zstd, from the original 1.8GB to 194MB. Opening this virtual dataset requires 7GB of free system memory.
A smaller file containing only 2 years of data is available at: s3://esip-qhub-public/ecmwf/ERA5_2020_2022_multivar.json.zst
%%time
fs = fsspec.filesystem('reference', fo='s3://esip-qhub-public/ecmwf/ERA5_1979_2022_multivar.json.zst',
target_options={'compression': 'zstd', 'anon':True},
remote_protocol='s3', remote_options={'anon':True})
m = fs.get_mapper('')
ds = xr.open_dataset(m, engine='zarr', backend_kwargs={'consolidated':False})
print(ds)
<xarray.Dataset>
Dimensions: (time0: 380568, lat: 721, lon: 1440)
Coordinates:
* lat (lat) float32 90.0 89.75 ... -90.0
* lon (lon) float32 0.0 0.25 ... 359.5 359.8
* time0 (time0) datetime64[ns] 1979-01-01 ....
Data variables:
air_pressure_at_mean_sea_level (time0, lat, lon) float32 ...
air_temperature_at_2_metres (time0, lat, lon) float32 ...
dew_point_temperature_at_2_metres (time0, lat, lon) float32 ...
eastward_wind_at_100_metres (time0, lat, lon) float32 ...
eastward_wind_at_10_metres (time0, lat, lon) float32 ...
lwe_thickness_of_surface_snow_amount (time0, lat, lon) float32 ...
northward_wind_at_100_metres (time0, lat, lon) float32 ...
sea_surface_temperature (time0, lat, lon) float32 ...
Attributes:
institution: ECMWF
source: Reanalysis
title: ERA5 forecasts
CPU times: user 48.8 s, sys: 5.61 s, total: 54.4 s
Wall time: 1min 8s
The above script required to open reference is rather complex. For this reason it is suggested to instead hide the script in an intake catalog such that all that is required to open the dataset is the following:
import intake
catalog = intake.open_catalog('s3://esip-qhub-public/ecmwf/intake_catalog.yml')
list(catalog)
['ERA5-Kerchunk-1979-2022', 'ERA5-Kerchunk-2020-2022']
ds = catalog['ERA5-Kerchunk-1979-2022'].to_dask()
Multiple different different datasets can be managed in a single intake catalog and so can be used to create a one stop shop containing all datasets available to a group of users.
Once the referenced dataset is loaded it can be operated on just like any other lazy xarray dataset.
%%time
da = ds.sel(time0 = '2021-01-01T00:00:00')
da['air_pressure_at_mean_sea_level'].plot()
CPU times: user 3.79 s, sys: 382 ms, total: 4.18 s
Wall time: 6.22 s
%%time
da = ds.sel(lat = -34).sel(lon = 198)
da.air_temperature_at_2_metres.sel(time0 = slice('2000-01-01','2000-12-31')).plot()
CPU times: user 9.92 s, sys: 663 ms, total: 10.6 s
Wall time: 16.5 s