Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Add ConsolidateDimensionCoordinates + ConsolidateZarrMetadata transforms #556

Closed
wants to merge 19 commits into from

Conversation

norlandrhagen
Copy link
Contributor

This PR is intended to kickstart issue #554. It lays down some of the groundwork outlined in @cisaacstern's recommendation under issue 544.

cc @abarciauskas-bgse @cisaacstern

Currently fails when running end_to_end tests.

pytest tests/test_end_to_end.py -k 'test_xarray_zarr[netcdf_local_file_pattern_sequential-target_chunks0-netcdf_local_paths_sequential_1d]'
ValueError: Region (slice(1, 2, None),) does not align with Zarr chunks (10,). [while running 'StoreToZarr/StoreDatasetFragments/Map(store_dataset_fragment)']

Traceback:

pytest tests/test_end_to_end.py -k 'test_xarray_zarr[netcdf_local_file_pattern_sequential-target_chunks0-netcdf_local_paths_sequential_1d]'
================================================================================================ test session starts ================================================================================================
platform darwin -- Python 3.10.12, pytest-7.4.0, pluggy-1.2.0
rootdir: /Users/nrhagen/Documents/carbonplan/pangeo_forge/pangeo-forge-recipes
configfile: setup.cfg
plugins: nbmake-1.4.1, timeout-2.1.0, cov-4.1.0, anyio-3.7.1, lazy-fixture-0.6.3
timeout: 30.0s
timeout method: signal
timeout func_only: False
collected 22 items / 21 deselected / 1 selected                                                                                                                                                                     

tests/test_end_to_end.py FE                                                                                                                                                                                   [100%]

====================================================================================================== ERRORS =======================================================================================================
____________________________________________ ERROR at teardown of test_xarray_zarr[netcdf_local_file_pattern_sequential-target_chunks0-netcdf_local_paths_sequential_1d] ____________________________________________

vname = 'time', var = <xarray.Variable (time: 1)>
array([1])
Attributes:
    units:     days since 2010-01-01
    calendar:  proleptic_gregorian
index = {Dimension(name='time', operation=<CombineOp.CONCAT: 2>): IndexedPosition(value=1, indexed=True, dimsize=10)}, zgroup = <zarr.hierarchy.Group '/'>

    def _store_data(vname: str, var: xr.Variable, index: Index, zgroup: zarr.Group) -> None:
        zarr_array = zgroup[vname]
        # get encoding for variable from zarr attributes
        var_coded = var.copy()  # copy needed for test suit to avoid modifying inputs in-place
        var_coded.encoding.update(zarr_array.attrs)
        var_coded.attrs = {}
        var = xr.backends.zarr.encode_zarr_variable(var_coded)
        data = np.asarray(var.data)
        region = _region_for(var, index)
        # check that the region evenly overlaps the zarr chunks
        for dimsize, chunksize, region_slice in zip(zarr_array.shape, zarr_array.chunks, region):
            if region_slice.start is None:
                continue
            try:
>               assert region_slice.start % chunksize == 0
E               AssertionError

pangeo_forge_recipes/writers.py:45: AssertionError

During handling of the above exception, another exception occurred:

>   ???

apache_beam/runners/common.py:1418: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/runners/common.py:838: in apache_beam.runners.common.PerWindowInvoker.invoke_process
    ???
apache_beam/runners/common.py:984: in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
    ???
../../../../.conda/envs/pangeo-forge-recipes/lib/python3.10/site-packages/apache_beam/transforms/core.py:-1: in <lambda>
    ???
pangeo_forge_recipes/writers.py:89: in store_dataset_fragment
    _store_data(vname, da.variable, index, zgroup)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

vname = 'time', var = <xarray.Variable (time: 1)>
array([1])
Attributes:
    units:     days since 2010-01-01
    calendar:  proleptic_gregorian
index = {Dimension(name='time', operation=<CombineOp.CONCAT: 2>): IndexedPosition(value=1, indexed=True, dimsize=10)}, zgroup = <zarr.hierarchy.Group '/'>

    def _store_data(vname: str, var: xr.Variable, index: Index, zgroup: zarr.Group) -> None:
        zarr_array = zgroup[vname]
        # get encoding for variable from zarr attributes
        var_coded = var.copy()  # copy needed for test suit to avoid modifying inputs in-place
        var_coded.encoding.update(zarr_array.attrs)
        var_coded.attrs = {}
        var = xr.backends.zarr.encode_zarr_variable(var_coded)
        data = np.asarray(var.data)
        region = _region_for(var, index)
        # check that the region evenly overlaps the zarr chunks
        for dimsize, chunksize, region_slice in zip(zarr_array.shape, zarr_array.chunks, region):
            if region_slice.start is None:
                continue
            try:
                assert region_slice.start % chunksize == 0
                assert (region_slice.stop % chunksize == 0) or (region_slice.stop == dimsize)
            except AssertionError:
>               raise ValueError(
                    f"Region {region} does not align with Zarr chunks {zarr_array.chunks}."
                )
E               ValueError: Region (slice(1, 2, None),) does not align with Zarr chunks (10,).

pangeo_forge_recipes/writers.py:48: ValueError

During handling of the above exception, another exception occurred:

    @pytest.fixture
    def pipeline():
        options = PipelineOptions(runtime_type_check=False)
>       with TestPipeline(options=options) as p:

tests/test_end_to_end.py:29: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../../../.conda/envs/pangeo-forge-recipes/lib/python3.10/site-packages/apache_beam/pipeline.py:600: in __exit__
    self.result = self.run()
../../../../.conda/envs/pangeo-forge-recipes/lib/python3.10/site-packages/apache_beam/testing/test_pipeline.py:112: in run
    result = super().run(
../../../../.conda/envs/pangeo-forge-recipes/lib/python3.10/site-packages/apache_beam/pipeline.py:553: in run
    self._options).run(False)
../../../../.conda/envs/pangeo-forge-recipes/lib/python3.10/site-packages/apache_beam/pipeline.py:577: in run
    return self.runner.run_pipeline(self, self._options)
../../../../.conda/envs/pangeo-forge-recipes/lib/python3.10/site-packages/apache_beam/runners/direct/direct_runner.py:129: in run_pipeline
    return runner.run_pipeline(pipeline, options)
../../../../.conda/envs/pangeo-forge-recipes/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:202: in run_pipeline
    self._latest_run_result = self.run_via_runner_api(
../../../../.conda/envs/pangeo-forge-recipes/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:224: in run_via_runner_api
    return self.run_stages(stage_context, stages)
../../../../.conda/envs/pangeo-forge-recipes/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:455: in run_stages
    bundle_results = self._execute_bundle(
../../../../.conda/envs/pangeo-forge-recipes/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:783: in _execute_bundle
    self._run_bundle(
../../../../.conda/envs/pangeo-forge-recipes/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:1012: in _run_bundle
    result, splits = bundle_manager.process_bundle(
../../../../.conda/envs/pangeo-forge-recipes/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:1348: in process_bundle
    result_future = self._worker_handler.control_conn.push(process_bundle_req)
../../../../.conda/envs/pangeo-forge-recipes/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py:379: in push
    response = self.worker.do_instruction(request)
../../../../.conda/envs/pangeo-forge-recipes/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py:629: in do_instruction
    return getattr(self, request_type)(
../../../../.conda/envs/pangeo-forge-recipes/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py:667: in process_bundle
    bundle_processor.process_bundle(instruction_id))
../../../../.conda/envs/pangeo-forge-recipes/lib/python3.10/site-packages/apache_beam/runners/worker/bundle_processor.py:1061: in process_bundle
    input_op_by_transform_id[element.transform_id].process_encoded(
../../../../.conda/envs/pangeo-forge-recipes/lib/python3.10/site-packages/apache_beam/runners/worker/bundle_processor.py:231: in process_encoded
    self.output(decoded_value)
apache_beam/runners/worker/operations.py:526: in apache_beam.runners.worker.operations.Operation.output
    ???
apache_beam/runners/worker/operations.py:528: in apache_beam.runners.worker.operations.Operation.output
    ???
apache_beam/runners/worker/operations.py:237: in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
    ???
apache_beam/runners/worker/operations.py:240: in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
    ???
apache_beam/runners/worker/operations.py:907: in apache_beam.runners.worker.operations.DoOperation.process
    ???
apache_beam/runners/worker/operations.py:908: in apache_beam.runners.worker.operations.DoOperation.process
    ???
apache_beam/runners/common.py:1420: in apache_beam.runners.common.DoFnRunner.process
    ???
apache_beam/runners/common.py:1492: in apache_beam.runners.common.DoFnRunner._reraise_augmented
    ???
apache_beam/runners/common.py:1418: in apache_beam.runners.common.DoFnRunner.process
    ???
apache_beam/runners/common.py:624: in apache_beam.runners.common.SimpleInvoker.invoke_process
    ???
apache_beam/runners/common.py:1582: in apache_beam.runners.common._OutputHandler.handle_process_outputs
    ???
apache_beam/runners/common.py:1695: in apache_beam.runners.common._OutputHandler._write_value_to_tag
    ???
apache_beam/runners/worker/operations.py:240: in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
    ???
apache_beam/runners/worker/operations.py:907: in apache_beam.runners.worker.operations.DoOperation.process
    ???
apache_beam/runners/worker/operations.py:908: in apache_beam.runners.worker.operations.DoOperation.process
    ???
apache_beam/runners/common.py:1420: in apache_beam.runners.common.DoFnRunner.process
    ???
apache_beam/runners/common.py:1508: in apache_beam.runners.common.DoFnRunner._reraise_augmented
    ???
apache_beam/runners/common.py:1418: in apache_beam.runners.common.DoFnRunner.process
    ???
apache_beam/runners/common.py:838: in apache_beam.runners.common.PerWindowInvoker.invoke_process
    ???
apache_beam/runners/common.py:984: in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
    ???
../../../../.conda/envs/pangeo-forge-recipes/lib/python3.10/site-packages/apache_beam/transforms/core.py:-1: in <lambda>
    ???
pangeo_forge_recipes/writers.py:89: in store_dataset_fragment
    _store_data(vname, da.variable, index, zgroup)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

vname = 'time', var = <xarray.Variable (time: 1)>
array([1])
Attributes:
    units:     days since 2010-01-01
    calendar:  proleptic_gregorian
index = {Dimension(name='time', operation=<CombineOp.CONCAT: 2>): IndexedPosition(value=1, indexed=True, dimsize=10)}, zgroup = <zarr.hierarchy.Group '/'>

    def _store_data(vname: str, var: xr.Variable, index: Index, zgroup: zarr.Group) -> None:
        zarr_array = zgroup[vname]
        # get encoding for variable from zarr attributes
        var_coded = var.copy()  # copy needed for test suit to avoid modifying inputs in-place
        var_coded.encoding.update(zarr_array.attrs)
        var_coded.attrs = {}
        var = xr.backends.zarr.encode_zarr_variable(var_coded)
        data = np.asarray(var.data)
        region = _region_for(var, index)
        # check that the region evenly overlaps the zarr chunks
        for dimsize, chunksize, region_slice in zip(zarr_array.shape, zarr_array.chunks, region):
            if region_slice.start is None:
                continue
            try:
                assert region_slice.start % chunksize == 0
                assert (region_slice.stop % chunksize == 0) or (region_slice.stop == dimsize)
            except AssertionError:
>               raise ValueError(
                    f"Region {region} does not align with Zarr chunks {zarr_array.chunks}."
                )
E               ValueError: Region (slice(1, 2, None),) does not align with Zarr chunks (10,). [while running 'StoreToZarr/StoreDatasetFragments/Map(store_dataset_fragment)']

pangeo_forge_recipes/writers.py:48: ValueError
------------------------------------------------------------------------------------------------- Captured log call -------------------------------------------------------------------------------------------------
WARNING  apache_beam.options.pipeline_options:pipeline_options.py:367 Discarding unparseable args: ['tests/test_end_to_end.py', '-k', 'test_xarray_zarr[netcdf_local_file_pattern_sequential-target_chunks0-netcdf_local_paths_sequential_1d]']
----------------------------------------------------------------------------------------------- Captured log teardown -----------------------------------------------------------------------------------------------
WARNING  apache_beam.options.pipeline_options:pipeline_options.py:367 Discarding unparseable args: ['tests/test_end_to_end.py', '-k', 'test_xarray_zarr[netcdf_local_file_pattern_sequential-target_chunks0-netcdf_local_paths_sequential_1d]']
===================================================================================================== FAILURES ======================================================================================================
______________________________________________________ test_xarray_zarr[netcdf_local_file_pattern_sequential-target_chunks0-netcdf_local_paths_sequential_1d] _______________________________________________________

vname = 'time', var = <xarray.Variable (time: 1)>
array([1])
Attributes:
    units:     days since 2010-01-01
    calendar:  proleptic_gregorian
index = {Dimension(name='time', operation=<CombineOp.CONCAT: 2>): IndexedPosition(value=1, indexed=True, dimsize=10)}, zgroup = <zarr.hierarchy.Group '/'>

    def _store_data(vname: str, var: xr.Variable, index: Index, zgroup: zarr.Group) -> None:
        zarr_array = zgroup[vname]
        # get encoding for variable from zarr attributes
        var_coded = var.copy()  # copy needed for test suit to avoid modifying inputs in-place
        var_coded.encoding.update(zarr_array.attrs)
        var_coded.attrs = {}
        var = xr.backends.zarr.encode_zarr_variable(var_coded)
        data = np.asarray(var.data)
        region = _region_for(var, index)
        # check that the region evenly overlaps the zarr chunks
        for dimsize, chunksize, region_slice in zip(zarr_array.shape, zarr_array.chunks, region):
            if region_slice.start is None:
                continue
            try:
>               assert region_slice.start % chunksize == 0
E               AssertionError

pangeo_forge_recipes/writers.py:45: AssertionError

During handling of the above exception, another exception occurred:

>   ???

apache_beam/runners/common.py:1418: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/runners/common.py:838: in apache_beam.runners.common.PerWindowInvoker.invoke_process
    ???
apache_beam/runners/common.py:984: in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
    ???
../../../../.conda/envs/pangeo-forge-recipes/lib/python3.10/site-packages/apache_beam/transforms/core.py:-1: in <lambda>
    ???
pangeo_forge_recipes/writers.py:89: in store_dataset_fragment
    _store_data(vname, da.variable, index, zgroup)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

vname = 'time', var = <xarray.Variable (time: 1)>
array([1])
Attributes:
    units:     days since 2010-01-01
    calendar:  proleptic_gregorian
index = {Dimension(name='time', operation=<CombineOp.CONCAT: 2>): IndexedPosition(value=1, indexed=True, dimsize=10)}, zgroup = <zarr.hierarchy.Group '/'>

    def _store_data(vname: str, var: xr.Variable, index: Index, zgroup: zarr.Group) -> None:
        zarr_array = zgroup[vname]
        # get encoding for variable from zarr attributes
        var_coded = var.copy()  # copy needed for test suit to avoid modifying inputs in-place
        var_coded.encoding.update(zarr_array.attrs)
        var_coded.attrs = {}
        var = xr.backends.zarr.encode_zarr_variable(var_coded)
        data = np.asarray(var.data)
        region = _region_for(var, index)
        # check that the region evenly overlaps the zarr chunks
        for dimsize, chunksize, region_slice in zip(zarr_array.shape, zarr_array.chunks, region):
            if region_slice.start is None:
                continue
            try:
                assert region_slice.start % chunksize == 0
                assert (region_slice.stop % chunksize == 0) or (region_slice.stop == dimsize)
            except AssertionError:
>               raise ValueError(
                    f"Region {region} does not align with Zarr chunks {zarr_array.chunks}."
                )
E               ValueError: Region (slice(1, 2, None),) does not align with Zarr chunks (10,).

pangeo_forge_recipes/writers.py:48: ValueError

During handling of the above exception, another exception occurred:

daily_xarray_dataset = <xarray.Dataset>
Dimensions:  (time: 10, lat: 18, lon: 36)
Coordinates:
  * time     (time) datetime64[ns] 2010-01-01 ...2 7 2
    foo      (time, lat, lon) float64 0.436 0.02593 0.5497 ... 0.9776 0.4635
Attributes:
    conventions:  CF 1.6
netcdf_local_file_pattern = <FilePattern {'time': 10}>, pipeline = <apache_beam.testing.test_pipeline.TestPipeline object at 0x17c41e110>
tmp_target_url = '/private/var/folders/mb/7d7yq_4j2qgdfm_j3j4tsyl40000gn/T/pytest-of-nrhagen/pytest-67/target.zarr0', target_chunks = {'time': 1}

    @pytest.mark.parametrize("target_chunks", [{"time": 1}, {"time": 2}, {"time": 3}])
    def test_xarray_zarr(
        daily_xarray_dataset,
        netcdf_local_file_pattern,
        pipeline,
        tmp_target_url,
        target_chunks,
    ):
        pattern = netcdf_local_file_pattern
>       with pipeline as p:

tests/test_end_to_end.py:42: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../../../.conda/envs/pangeo-forge-recipes/lib/python3.10/site-packages/apache_beam/pipeline.py:600: in __exit__
    self.result = self.run()
../../../../.conda/envs/pangeo-forge-recipes/lib/python3.10/site-packages/apache_beam/testing/test_pipeline.py:112: in run
    result = super().run(
../../../../.conda/envs/pangeo-forge-recipes/lib/python3.10/site-packages/apache_beam/pipeline.py:553: in run
    self._options).run(False)
../../../../.conda/envs/pangeo-forge-recipes/lib/python3.10/site-packages/apache_beam/pipeline.py:577: in run
    return self.runner.run_pipeline(self, self._options)
../../../../.conda/envs/pangeo-forge-recipes/lib/python3.10/site-packages/apache_beam/runners/direct/direct_runner.py:129: in run_pipeline
    return runner.run_pipeline(pipeline, options)
../../../../.conda/envs/pangeo-forge-recipes/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:202: in run_pipeline
    self._latest_run_result = self.run_via_runner_api(
../../../../.conda/envs/pangeo-forge-recipes/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:224: in run_via_runner_api
    return self.run_stages(stage_context, stages)
../../../../.conda/envs/pangeo-forge-recipes/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:455: in run_stages
    bundle_results = self._execute_bundle(
../../../../.conda/envs/pangeo-forge-recipes/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:783: in _execute_bundle
    self._run_bundle(
../../../../.conda/envs/pangeo-forge-recipes/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:1012: in _run_bundle
    result, splits = bundle_manager.process_bundle(
../../../../.conda/envs/pangeo-forge-recipes/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:1348: in process_bundle
    result_future = self._worker_handler.control_conn.push(process_bundle_req)
../../../../.conda/envs/pangeo-forge-recipes/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py:379: in push
    response = self.worker.do_instruction(request)
../../../../.conda/envs/pangeo-forge-recipes/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py:629: in do_instruction
    return getattr(self, request_type)(
../../../../.conda/envs/pangeo-forge-recipes/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py:667: in process_bundle
    bundle_processor.process_bundle(instruction_id))
../../../../.conda/envs/pangeo-forge-recipes/lib/python3.10/site-packages/apache_beam/runners/worker/bundle_processor.py:1061: in process_bundle
    input_op_by_transform_id[element.transform_id].process_encoded(
../../../../.conda/envs/pangeo-forge-recipes/lib/python3.10/site-packages/apache_beam/runners/worker/bundle_processor.py:231: in process_encoded
    self.output(decoded_value)
apache_beam/runners/worker/operations.py:526: in apache_beam.runners.worker.operations.Operation.output
    ???
apache_beam/runners/worker/operations.py:528: in apache_beam.runners.worker.operations.Operation.output
    ???
apache_beam/runners/worker/operations.py:237: in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
    ???
apache_beam/runners/worker/operations.py:240: in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
    ???
apache_beam/runners/worker/operations.py:907: in apache_beam.runners.worker.operations.DoOperation.process
    ???
apache_beam/runners/worker/operations.py:908: in apache_beam.runners.worker.operations.DoOperation.process
    ???
apache_beam/runners/common.py:1420: in apache_beam.runners.common.DoFnRunner.process
    ???
apache_beam/runners/common.py:1492: in apache_beam.runners.common.DoFnRunner._reraise_augmented
    ???
apache_beam/runners/common.py:1418: in apache_beam.runners.common.DoFnRunner.process
    ???
apache_beam/runners/common.py:624: in apache_beam.runners.common.SimpleInvoker.invoke_process
    ???
apache_beam/runners/common.py:1582: in apache_beam.runners.common._OutputHandler.handle_process_outputs
    ???
apache_beam/runners/common.py:1695: in apache_beam.runners.common._OutputHandler._write_value_to_tag
    ???
apache_beam/runners/worker/operations.py:240: in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
    ???
apache_beam/runners/worker/operations.py:907: in apache_beam.runners.worker.operations.DoOperation.process
    ???
apache_beam/runners/worker/operations.py:908: in apache_beam.runners.worker.operations.DoOperation.process
    ???
apache_beam/runners/common.py:1420: in apache_beam.runners.common.DoFnRunner.process
    ???
apache_beam/runners/common.py:1508: in apache_beam.runners.common.DoFnRunner._reraise_augmented
    ???
apache_beam/runners/common.py:1418: in apache_beam.runners.common.DoFnRunner.process
    ???
apache_beam/runners/common.py:838: in apache_beam.runners.common.PerWindowInvoker.invoke_process
    ???
apache_beam/runners/common.py:984: in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
    ???
../../../../.conda/envs/pangeo-forge-recipes/lib/python3.10/site-packages/apache_beam/transforms/core.py:-1: in <lambda>
    ???
pangeo_forge_recipes/writers.py:89: in store_dataset_fragment
    _store_data(vname, da.variable, index, zgroup)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

vname = 'time', var = <xarray.Variable (time: 1)>
array([1])
Attributes:
    units:     days since 2010-01-01
    calendar:  proleptic_gregorian
index = {Dimension(name='time', operation=<CombineOp.CONCAT: 2>): IndexedPosition(value=1, indexed=True, dimsize=10)}, zgroup = <zarr.hierarchy.Group '/'>

    def _store_data(vname: str, var: xr.Variable, index: Index, zgroup: zarr.Group) -> None:
        zarr_array = zgroup[vname]
        # get encoding for variable from zarr attributes
        var_coded = var.copy()  # copy needed for test suit to avoid modifying inputs in-place
        var_coded.encoding.update(zarr_array.attrs)
        var_coded.attrs = {}
        var = xr.backends.zarr.encode_zarr_variable(var_coded)
        data = np.asarray(var.data)
        region = _region_for(var, index)
        # check that the region evenly overlaps the zarr chunks
        for dimsize, chunksize, region_slice in zip(zarr_array.shape, zarr_array.chunks, region):
            if region_slice.start is None:
                continue
            try:
                assert region_slice.start % chunksize == 0
                assert (region_slice.stop % chunksize == 0) or (region_slice.stop == dimsize)
            except AssertionError:
>               raise ValueError(
                    f"Region {region} does not align with Zarr chunks {zarr_array.chunks}."
                )
E               ValueError: Region (slice(1, 2, None),) does not align with Zarr chunks (10,). [while running 'StoreToZarr/StoreDatasetFragments/Map(store_dataset_fragment)']

pangeo_forge_recipes/writers.py:48: ValueError
------------------------------------------------------------------------------------------------- Captured log call -------------------------------------------------------------------------------------------------
WARNING  apache_beam.options.pipeline_options:pipeline_options.py:367 Discarding unparseable args: ['tests/test_end_to_end.py', '-k', 'test_xarray_zarr[netcdf_local_file_pattern_sequential-target_chunks0-netcdf_local_paths_sequential_1d]']
================================================================================================= warnings summary ==================================================================================================
../../../../.conda/envs/pangeo-forge-recipes/lib/python3.10/site-packages/hdfs/config.py:15
  /Users/nrhagen/.conda/envs/pangeo-forge-recipes/lib/python3.10/site-packages/hdfs/config.py:15: DeprecationWarning: the imp module is deprecated in favour of importlib and slated for removal in Python 3.12; see the module's documentation for alternative uses
    from imp import load_source

../../../../.conda/envs/pangeo-forge-recipes/lib/python3.10/site-packages/apache_beam/testing/test_pipeline.py:36
  /Users/nrhagen/.conda/envs/pangeo-forge-recipes/lib/python3.10/site-packages/apache_beam/testing/test_pipeline.py:36: PytestCollectionWarning: cannot collect test class 'TestPipeline' because it has a __init__ constructor (from: tests/test_end_to_end.py)
    class TestPipeline(Pipeline):

tests/test_end_to_end.py::test_xarray_zarr[netcdf_local_file_pattern_sequential-target_chunks0-netcdf_local_paths_sequential_1d]
  <frozen importlib._bootstrap>:241: RuntimeWarning: numpy.ndarray size changed, may indicate binary incompatibility. Expected 16 from C header, got 96 from PyObject

-- Docs: https://docs.pytest.org/en/stable/how-to/capture-warnings.html
============================================================================================== short test summary info ==============================================================================================
FAILED tests/test_end_to_end.py::test_xarray_zarr[netcdf_local_file_pattern_sequential-target_chunks0-netcdf_local_paths_sequential_1d] - ValueError: Region (slice(1, 2, None),) does not align with Zarr chunks (10,). [while running 'StoreToZarr/StoreDatasetFragments/Map(store_dataset_fragment)']
ERROR tests/test_end_to_end.py::test_xarray_zarr[netcdf_local_file_pattern_sequential-target_chunks0-netcdf_local_paths_sequential_1d] - ValueError: Region (slice(1, 2, None),) does not align with Zarr chunks (10,). [while running 'StoreToZarr/StoreDatasetFragments/Map(store_dataset_fragment)']
=============================================================================== 1 failed, 21 deselected, 3 warnings, 1 error in 2.92s ===============================================================================
Exception ignored in: <function File.close at 0x17f433760>
Traceback (most recent call last):
  File "/Users/nrhagen/.conda/envs/pangeo-forge-recipes/lib/python3.10/site-packages/h5netcdf/core.py", line 1200, in close
  File "/Users/nrhagen/.conda/envs/pangeo-forge-recipes/lib/python3.10/site-packages/h5py/_hl/files.py", line 585, in close
TypeError: bad operand type for unary ~: 'NoneType'
Exception ignored in: <function File.close at 0x17f433760>
Traceback (most recent call last):
  File "/Users/nrhagen/.conda/envs/pangeo-forge-recipes/lib/python3.10/site-packages/h5netcdf/core.py", line 1200, in close
  File "/Users/nrhagen/.conda/envs/pangeo-forge-recipes/lib/python3.10/site-packages/h5py/_hl/files.py", line 585, in close
TypeError: bad operand type for unary ~: 'NoneType'

 ~/Documents/carbonplan/pangeo_forge/pangeo-forge-recipes | on coord_chunking *19 !4 ?1                                                                          took 6s | pangeo-forge-recipes py | at 12:01:42 PM 
> 

@cisaacstern
Copy link
Member

Thanks for getting this going, @norlandrhagen!

Looks like a great start. Please lmk if I can help.

@jbusecke
Copy link
Contributor

jbusecke commented Aug 5, 2023

I might be off here, but would it be possible that this is related to the issues I am having over at leap-stc/cmip6-leap-feedstock#4 (comment)? I am currently digging into what is causing those failures and will report in separate issue. Just wanted to flag this.

@abarciauskas-bgse
Copy link
Contributor

Thank you for looking into this @norlandrhagen !

@cisaacstern
Copy link
Member

I might be off here, but would it be possible that this is related to the issues I am having over at leap-stc/cmip6-leap-feedstock#4 (comment)? I am currently digging into what is causing those failures and will report in separate issue. Just wanted to flag this.

@jbusecke what issues are you referring to? And are you still thinking this may be related?

@@ -400,4 +421,7 @@ def expand(self, datasets: beam.PCollection) -> beam.PCollection:
target=self.get_full_target(), target_chunks=self.target_chunks
)
rechunked_datasets | StoreDatasetFragments(target_store=target_store)
if self.consolidate_coords:
ConsolidateDimensionCoordinates(target_store=target_store)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
ConsolidateDimensionCoordinates(target_store=target_store)
target_store | ConsolidateDimensionCoordinates()

@norlandrhagen hmm maybe as a PTransform, ConsolidateDimensionCoordinates needs to take a PCollection as input (via |)? If so, we can probably do it like this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah good catch! Yeah that makes more sense.

@jbusecke
Copy link
Contributor

Maybe I was wrong. Ill investigate my issues with the dynamic chunking next week and report back there.

@review-notebook-app
Copy link

Check out this pull request on  ReviewNB

See visual diffs & provide feedback on Jupyter Notebooks.


Powered by ReviewNB

@norlandrhagen
Copy link
Contributor Author

Update from the work @cisaacstern and I did this morning.

  • In our testing, it seems that theConsolidateDimensionCoordinates needs to be run one only once after the dataset fragments have been written. In the current state, the StoreToZarr does not run sequentially. We need to find a way to enforce that ConsolidateDimensionCoordinates is ran after the Zarr store has been written.
  • Instead of passing target_store as a side input into ConsolidateDimensionCoordinates, we might be able to pass a single value PCollection using beam.combiners.Sample.FixedSizeGlobally(1) into ConsolidateDimensionCoordinates.
  • The end_to_end test of test_xarray_zarr_subpath seemed redundant, so it was converted to test_xarray_zarr_consolidate_coords. This needs to be built out to test if the consolidate_coords kwargs is consolidate the coordinate chunks.

I'm sure there is a lot I missed, so feel free to add @cisaacstern .

@cisaacstern
Copy link
Member

cisaacstern commented Aug 21, 2023

If I had to guess, the slow loading speed noted in leap-stc/ClimSim#38 (comment) could be partially alleviated by this PR. This is a case where we probably wouldn't want to re-run the entire pipeline, so we might run a one-off job applying the ConsolidateDimensionCoordinates transform to the existing store.

Edit: On further reflection, I'm actually not 100% sure if initial xarray dataset loading time should be expected to speed up with consolidated coordinate dims, or if that is purely addressable with #568. 🤔

@cisaacstern
Copy link
Member

Also @norlandrhagen I will try to fix the target_store sampling issue we ran into together in a separate PR, as described in #564, as this is now also a blocker for other work.

@cisaacstern
Copy link
Member

cisaacstern commented Aug 23, 2023

@norlandrhagen merge of #574 unblocks us here. I think it will be easiest if we finish this PR after #575 goes in. In anticipation of that being soon, could you clean up the Files Changed here?

It looks like for some reason a bunch of files in docs/ are included in the Files Changed currently, but that's not part of our work here. IIUC, we just changed a few things in pangeo_forge_recipes/ and tests/, so docs/ should be even with main?

Thanks so much! 🙏

@norlandrhagen
Copy link
Contributor Author

Great to hear! I'll get this PR cleaned up.

@norlandrhagen
Copy link
Contributor Author

Updates:

  • Piping SampleSingleton() to ConsolidateDimensionCoordinates().
  • Verified with pdb that the consolidate_dimension_coordinates path is being followed when self.consolidate_coords is True.
  • The incomplete test test_xarray_zarr_consolidate_coords in test_end_to_end.py is failing when consolidate_dimension_coordinates is True.
FAILED tests/test_end_to_end.py::test_xarray_zarr_consolidate_coords[netcdf3_local_paths_sequential_1d-True] - ValueError: Failed to decode variable 'time': destination buffer too small; expected at least 40, got 4
FAILED tests/test_end_to_end.py::test_xarray_zarr_consolidate_coords[netcdf_local_paths_sequential_2d-True] - ValueError: Failed to decode variable 'time': cannot reshape array of size 10 into shape (2,)
FAILED tests/test_end_to_end.py::test_xarray_zarr_consolidate_coords[netcdf_local_paths_sequential_1d-True] - ValueError: Failed to decode variable 'time': destination buffer too small; expected at least 80, got 8

@abarciauskas-bgse
Copy link
Contributor

@norlandrhagen thanks for all your work on this - I tried this branch out as I'm trying to benchmark different chunk configurations of NEX GDDP CMIP6 data on AWS and got some errors. Could you take a look and tell me if it's user error? https://nbviewer.org/gist/abarciauskas-bgse/e3b3b8b7dd2d8abb6be1e36a5c3b7678

@cisaacstern
Copy link
Member

cisaacstern commented Aug 28, 2023

Thanks for following the progress here, @abarciauskas-bgse.

In its current form, I would not expect this PR to work.

Raphael and I are planning to devote some further effort to this this week, we'll ping you here when it looks functional!

@norlandrhagen
Copy link
Contributor Author

Update from today after working with @cisaacstern

  • Consolidate metadata functionality from PR Provide consolidate_metadata option in StoreToZarr #575 was added to this PR.
  • consolidated=False was added into schema_to_zarr - This definitely needs review!
  • The fsspec rm functionality in 0.9.4 in the consolidate_dimension_coordinates equivalent was added back in. Note: from the Aug 28th meeting notes, @alxmrs has some great insight into the performance implications of cloud bulk deletes.

To Do:

  • test_xarray_zarr_consolidate_coords still needs work
  • The last bit of logic in StoreToZarr could probably be combined. @cisaacstern any thoughts here?
        singleton_target_store = (
            n_target_stores | SampleSingleton()
            if not self.consolidate_coords
            else n_target_stores | SampleSingleton() | ConsolidateDimensionCoordinates()
        )
        singleton_target_store = (
            n_target_stores | SampleSingleton()
            if not self.consolidate_metadata
            else n_target_stores | SampleSingleton() | ConsolidateZarrMetadata()
        )

@norlandrhagen
Copy link
Contributor Author

@abarciauskas-bgse we made some more progress, but not quite done yet!

@cisaacstern cisaacstern changed the title [WIP] ConsolidateDimensionCoordinates Transformation [WIP] Add ConsolidateDimensionCoordinates + ConsolidateZarrMetadata transforms Aug 31, 2023
store = zarr.open(os.path.join(tmp_target_url, "subpath"))

# fails
assert netcdf_local_file_pattern_sequential.dims["time"] == store.time.chunks[0]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@norlandrhagen what is the rationale behind this assert? It is not intuitive to me why these two values:

  1. Would be == to one-another; or
  2. How this would be evidence of coordinate consolidation

Additionally, in your view is the fact that this is currently failing evidence that the current implementation is wrong? Or that this assert is not checking the right thing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cisaacstern I think it's the the assert isn't checking the right thing.

@ranchodeluxe
Copy link
Contributor

@norlandrhagen: I'm thinking about giving this branch a run. Is there anything I can help with on this ticket to get it merged besides testing it?

@norlandrhagen
Copy link
Contributor Author

@ranchodeluxe please do! It needs testing.

Comment on lines +508 to +509
consolidate_coords: bool = True
consolidate_metadata: bool = True
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally, I'd prefer composition over configuration here. Is there a chance we can just expose the ConsolidateMetadata transform from the module.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting @sbquinlan. Is there an example that could illustrate the advantage of exposing these stages? Just looking at this I would think this looks good since in almost all my cases I would want to have those two steps happening by default, but I might be missing something here.

Copy link
Member

@sbquinlan sbquinlan Dec 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jbusecke My point is basically I prefer:

recipe = (
    beam.Create(pattern.items())
    | OpenWithKerchunk(
        remote_protocol='s3' if selected_rel == S3_REL else 'https',
        file_type=pattern.file_type,
        storage_options=pattern.fsspec_open_kwargs,
    )
    | CombineReferences(
        concat_dims=pattern.concat_dims,
        identical_dims=IDENTICAL_DIMS,
        mzz_kwargs={'coo_map': {'time': get_time_from_attr}},
    )
    | ConsolidateMetadata(storage_options=pattern.fsspec_open_kwargs)
    | WriteReferences(
        store_name=SHORT_NAME,
        target_root=hacky_way_to_pull.target_root,
        storage_options=pattern.fsspec_open_kwargs,
    )
    | ValidateDatasetDimensions(expected_dims={'time': None, 'lat': (-50, 50), 'lon': (-180, 180)})

To:

recipe = (
    beam.Create(pattern.items())
    CreateZarrFromPattern(
      combine_references=True,
      consolidate_metadata=True,
      consolidate_coords=True,
   )

Hopefully that makes sense. @norlandrhagen already landed changes to make the former possible so that was very much appreciated and unblocks me. So I don't really have much of a strong opinion here anymore. If we want to provide "one-size-fits-all" transforms with StoreToZarr/WriteCombinedReferences that are configurable with boolean flags for people less familiar with Beam or Zarr internals, that's fine, as long as power users still have access to the individual transforms to customize as they see fit. Like if you need to tweak a step for CMIP6.

Again, no blocking feedback from me on this PR. Just need to figure out the tests and we're good to go. Might be work breaking up metadata and coords just to cut down on PR complexity.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think putting it that way makes a lot of sense @sbquinlan.

@jbusecke
Copy link
Contributor

Hey folks. Thank you very much for working on this. Is there any way I can help here? I think this feature is quite essential for my CMIP6 data ingestion, so just wanted to offer my help if needed.

@norlandrhagen
Copy link
Contributor Author

Hey @jbusecke. This PR has been my pangeo-forge albatross. Any work on it would be appreciated! I just updated it with main and am getting some test failures, so it probably needs a bit of updating. It also is lacking any useful tests to see if the ConsolidateDimensionCoordinates is working.

@jbusecke
Copy link
Contributor

Would be happy to pair on this early next week or after Jan 6 if you like.

@norlandrhagen
Copy link
Contributor Author

After Jan 6th works good for me.

@jbusecke
Copy link
Contributor

Picking this back up after crawling through my gh inbox (Yay winter break). Since I am coming closer to releasing CMIP6 data on the public bucket, this is getting more important. Ill reach out on slack maybe?

@norlandrhagen
Copy link
Contributor Author

Replaced by this PR: #671

norlandrhagen added a commit that referenced this pull request Feb 8, 2024
ConsolidatedDimensionCoords Transform -- cleaned up version of PR #556
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants