Skip to content

maximum recursion with dask and pydap backend #4348

Open
@jhamman

Description

@jhamman

What happened:

I'm getting a maximum recursion error when using the Pydap backend with Dask distributed. It seems the we're failing to successfully pickle the pydap backend store.

What you expected to happen:

Successful parallel loading of opendap dataset.

Minimal Complete Verifiable Example:

import xarray as xr
from dask.distributed import Client

client = Client()

ds = xr.open_dataset('http://thredds.northwestknowledge.net:8080/thredds/dodsC/agg_terraclimate_pet_1958_CurrentYear_GLOBE.nc',
                     engine='pydap', chunks={'lat': 1024, 'lon': 1024, 'time': 12}).load()

yields:

Killed worker on the client:

--------------------------------------------------------------------------- KilledWorker Traceback (most recent call last) in 4 client = Client() 5 ----> 6 ds = xr.open_dataset('http://thredds.northwestknowledge.net:8080/thredds/dodsC/agg_terraclimate_pet_1958_CurrentYear_GLOBE.nc', 7 engine='pydap', chunks={'lat': 1024, 'lon': 1024, 'time': 12}).load()

~/miniconda3/envs/carbonplan38/lib/python3.8/site-packages/xarray/core/dataset.py in load(self, **kwargs)
652
653 # evaluate all the dask arrays simultaneously
--> 654 evaluated_data = da.compute(*lazy_data.values(), **kwargs)
655
656 for k, data in zip(lazy_data, evaluated_data):

~/miniconda3/envs/carbonplan38/lib/python3.8/site-packages/dask/base.py in compute(*args, **kwargs)
435 keys = [x.dask_keys() for x in collections]
436 postcomputes = [x.dask_postcompute() for x in collections]
--> 437 results = schedule(dsk, keys, **kwargs)
438 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
439

~/miniconda3/envs/carbonplan38/lib/python3.8/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
2594 should_rejoin = False
2595 try:
-> 2596 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
2597 finally:
2598 for f in futures.values():

~/miniconda3/envs/carbonplan38/lib/python3.8/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
1886 else:
1887 local_worker = None
-> 1888 return self.sync(
1889 self._gather,
1890 futures,

~/miniconda3/envs/carbonplan38/lib/python3.8/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
775 return future
776 else:
--> 777 return sync(
778 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
779 )

~/miniconda3/envs/carbonplan38/lib/python3.8/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
346 if error[0]:
347 typ, exc, tb = error[0]
--> 348 raise exc.with_traceback(tb)
349 else:
350 return result[0]

~/miniconda3/envs/carbonplan38/lib/python3.8/site-packages/distributed/utils.py in f()
330 if callback_timeout is not None:
331 future = asyncio.wait_for(future, callback_timeout)
--> 332 result[0] = yield future
333 except Exception as exc:
334 error[0] = sys.exc_info()

~/miniconda3/envs/carbonplan38/lib/python3.8/site-packages/tornado/gen.py in run(self)
733
734 try:
--> 735 value = future.result()
736 except Exception:
737 exc_info = sys.exc_info()

~/miniconda3/envs/carbonplan38/lib/python3.8/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
1751 exc = CancelledError(key)
1752 else:
-> 1753 raise exception.with_traceback(traceback)
1754 raise exc
1755 if errors == "skip":

KilledWorker: ('open_dataset-54c87cd25bf4e9df37cb3030e6602974pet-d39db76f8636f3803611948183e52c13', <Worker 'tcp://127.0.0.1:57343', name: 0, memory: 0, processing: 1>)

and the above mentioned recursion error on the workers:

distributed.worker - INFO - -------------------------------------------------

distributed.worker - INFO - Registered to: tcp://127.0.0.1:57334

distributed.worker - INFO - -------------------------------------------------

distributed.worker - ERROR - maximum recursion depth exceeded Traceback (most recent call last): File "/Users/jhamman/miniconda3/envs/carbonplan38/lib/python3.8/site-packages/distributed/worker.py", line 931, in handle_scheduler await self.handle_stream( File "/Users/jhamman/miniconda3/envs/carbonplan38/lib/python3.8/site-packages/distributed/core.py", line 455, in handle_stream msgs = await comm.read() File "/Users/jhamman/miniconda3/envs/carbonplan38/lib/python3.8/site-packages/distributed/comm/tcp.py", line 211, in read msg = await from_frames( File "/Users/jhamman/miniconda3/envs/carbonplan38/lib/python3.8/site-packages/distributed/comm/utils.py", line 75, in from_frames res = _from_frames() File "/Users/jhamman/miniconda3/envs/carbonplan38/lib/python3.8/site-packages/distributed/comm/utils.py", line 60, in _from_frames return protocol.loads( File "/Users/jhamman/miniconda3/envs/carbonplan38/lib/python3.8/site-packages/distributed/protocol/core.py", line 130, in loads value = _deserialize(head, fs, deserializers=deserializers) File "/Users/jhamman/miniconda3/envs/carbonplan38/lib/python3.8/site-packages/distributed/protocol/serialize.py", line 269, in deserialize return loads(header, frames) File "/Users/jhamman/miniconda3/envs/carbonplan38/lib/python3.8/site-packages/distributed/protocol/serialize.py", line 59, in pickle_loads return pickle.loads(b"".join(frames)) File "/Users/jhamman/miniconda3/envs/carbonplan38/lib/python3.8/site-packages/distributed/protocol/pickle.py", line 59, in loads return pickle.loads(x) File "/Users/jhamman/miniconda3/envs/carbonplan38/lib/python3.8/site-packages/pydap/model.py", line 235, in getattr return self.attributes[attr] File "/Users/jhamman/miniconda3/envs/carbonplan38/lib/python3.8/site-packages/pydap/model.py", line 235, in getattr return self.attributes[attr] File "/Users/jhamman/miniconda3/envs/carbonplan38/lib/python3.8/site-packages/pydap/model.py", line 235, in getattr return self.attributes[attr] [Previous line repeated 973 more times] RecursionError: maximum recursion depth exceeded

distributed.worker - INFO - Connection to scheduler broken. Reconnecting...

Anything else we need to know?:

I've found this to be reproducible with a few kinds of Dask clusters. Setting Client(processes=False) does correct the problem at the expense of multiprocessiing.

Environment:

Output of xr.show_versions()

INSTALLED VERSIONS

commit: None
python: 3.8.2 | packaged by conda-forge | (default, Mar 5 2020, 16:54:44)
[Clang 9.0.1 ]
python-bits: 64
OS: Darwin
OS-release: 19.5.0
machine: x86_64
processor: i386
byteorder: little
LC_ALL: None
LANG: en_US.UTF-8
LOCALE: en_US.UTF-8
libhdf5: 1.10.5
libnetcdf: 4.7.3

xarray: 0.15.1
pandas: 1.0.3
numpy: 1.18.1
scipy: 1.4.1
netCDF4: 1.5.3
pydap: installed
h5netcdf: 0.8.0
h5py: 2.10.0
Nio: None
zarr: 2.4.0
cftime: 1.1.1.2
nc_time_axis: 1.2.0
PseudoNetCDF: None
rasterio: 1.0.28
cfgrib: None
iris: None
bottleneck: 1.3.2
dask: 2.13.0
distributed: 2.13.0
matplotlib: 3.2.1
cartopy: 0.17.0
seaborn: 0.10.0
numbagg: installed
setuptools: 46.1.3.post20200325
pip: 20.0.2
conda: installed
pytest: 5.4.1
IPython: 7.13.0
sphinx: 3.1.1

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions