Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RuntimeError when number of workers changes #59

Open
Simon-van-Diepen opened this issue Jul 28, 2023 · 0 comments
Open

RuntimeError when number of workers changes #59

Simon-van-Diepen opened this issue Jul 28, 2023 · 0 comments

Comments

@Simon-van-Diepen
Copy link

When the number of workers changes during processing of large jobs (>500000 tasks), sometimes the following RuntimeError appears and computation stops:

RuntimeError                              Traceback (most recent call last)
Cell In[61], line 2
      1 if check_step(6):
----> 2     computed_statistics = dask.compute(all_statistics, traverse=True)[0]

File ~/mambaforge/envs/jupyter_dask/lib/python3.11/site-packages/dask/base.py:595, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    592     keys.append(x.__dask_keys__())
    593     postcomputes.append(x.__dask_postcompute__())
--> 595 results = schedule(dsk, keys, **kwargs)
    596 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File ~/mambaforge/envs/jupyter_dask/lib/python3.11/site-packages/distributed/client.py:3227, in Client.get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   3225         should_rejoin = False
   3226 try:
-> 3227     results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   3228 finally:
   3229     for f in futures.values():

File ~/mambaforge/envs/jupyter_dask/lib/python3.11/site-packages/distributed/client.py:2361, in Client.gather(self, futures, errors, direct, asynchronous)
   2359 else:
   2360     local_worker = None
-> 2361 return self.sync(
   2362     self._gather,
   2363     futures,
   2364     errors=errors,
   2365     direct=direct,
   2366     local_worker=local_worker,
   2367     asynchronous=asynchronous,
   2368 )

File ~/mambaforge/envs/jupyter_dask/lib/python3.11/site-packages/distributed/utils.py:351, in SyncMethodMixin.sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    349     return future
    350 else:
--> 351     return sync(
    352         self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    353     )

File ~/mambaforge/envs/jupyter_dask/lib/python3.11/site-packages/distributed/utils.py:418, in sync(loop, func, callback_timeout, *args, **kwargs)
    416 if error:
    417     typ, exc, tb = error
--> 418     raise exc.with_traceback(tb)
    419 else:
    420     return result

File ~/mambaforge/envs/jupyter_dask/lib/python3.11/site-packages/distributed/utils.py:391, in sync.<locals>.f()
    389         future = wait_for(future, callback_timeout)
    390     future = asyncio.ensure_future(future)
--> 391     result = yield future
    392 except Exception:
    393     error = sys.exc_info()

File ~/mambaforge/envs/jupyter_dask/lib/python3.11/site-packages/tornado/gen.py:767, in Runner.run(self)
    765 try:
    766     try:
--> 767         value = future.result()
    768     except Exception as e:
    769         # Save the exception for later. It's important that
    770         # gen.throw() not be called inside this try/except block
    771         # because that makes sys.exc_info behave unexpectedly.
    772         exc: Optional[Exception] = e

File ~/mambaforge/envs/jupyter_dask/lib/python3.11/site-packages/distributed/client.py:2224, in Client._gather(self, futures, errors, direct, local_worker)
   2222         exc = CancelledError(key)
   2223     else:
-> 2224         raise exception.with_traceback(traceback)
   2225     raise exc
   2226 if errors == "skip":

File ~/mambaforge/envs/jupyter_dask/lib/python3.11/site-packages/dask/optimization.py:992, in __call__()
    990 if not len(args) == len(self.inkeys):
    991     raise ValueError("Expected %d args, got %d" % (len(self.inkeys), len(args)))
--> 992 return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))

File ~/mambaforge/envs/jupyter_dask/lib/python3.11/site-packages/dask/core.py:151, in get()
    149 for key in toposort(dsk):
    150     task = dsk[key]
--> 151     result = _execute_task(task, cache)
    152     cache[key] = result
    153 result = _execute_task(out, cache)

File ~/mambaforge/envs/jupyter_dask/lib/python3.11/site-packages/dask/core.py:121, in _execute_task()
    117     func, args = arg[0], arg[1:]
    118     # Note: Don't assign the subtask results to a variable. numpy detects
    119     # temporaries by their reference count and can execute certain
    120     # operations in-place.
--> 121     return func(*(_execute_task(a, cache) for a in args))
    122 elif not ishashable(arg):
    123     return arg

File ~/mambaforge/envs/jupyter_dask/lib/python3.11/site-packages/dask/dataframe/methods.py:357, in assign()
    355 df = df.copy(deep=bool(deep))
    356 for name, val in pairs.items():
--> 357     df[name] = val
    358 return df

File ~/mambaforge/envs/jupyter_dask/lib/python3.11/site-packages/pandas/core/frame.py:3950, in __setitem__()
   3947     self._setitem_array([key], value)
   3948 else:
   3949     # set column
-> 3950     self._set_item(key, value)

File ~/mambaforge/envs/jupyter_dask/lib/python3.11/site-packages/pandas/core/frame.py:4143, in _set_item()
   4133 def _set_item(self, key, value) -> None:
   4134     """
   4135     Add series to DataFrame in specified column.
   4136 
   (...)
   4141     ensure homogeneity.
   4142     """
-> 4143     value = self._sanitize_column(value)
   4145     if (
   4146         key in self.columns
   4147         and value.ndim == 1
   4148         and not is_extension_array_dtype(value)
   4149     ):
   4150         # broadcast across multiple columns if necessary
   4151         if not self.columns.is_unique or isinstance(self.columns, MultiIndex):

File ~/mambaforge/envs/jupyter_dask/lib/python3.11/site-packages/pandas/core/frame.py:4871, in _sanitize_column()
   4869 if is_list_like(value):
   4870     com.require_length_match(value, self.index)
-> 4871 return sanitize_array(value, self.index, copy=True, allow_2d=True)

File ~/mambaforge/envs/jupyter_dask/lib/python3.11/site-packages/pandas/core/construction.py:580, in sanitize_array()
    576         subarr = _try_cast(data, dtype, copy)
    578 elif hasattr(data, "__array__"):
    579     # e.g. dask array GH#38645
--> 580     data = np.array(data, copy=copy)
    581     return sanitize_array(
    582         data,
    583         index=index,
   (...)
    586         allow_2d=allow_2d,
    587     )
    589 else:

File ~/mambaforge/envs/jupyter_dask/lib/python3.11/site-packages/dask/array/core.py:1701, in __array__()
   1700 def __array__(self, dtype=None, **kwargs):
-> 1701     x = self.compute()
   1702     if dtype and x.dtype != dtype:
   1703         x = x.astype(dtype)

File ~/mambaforge/envs/jupyter_dask/lib/python3.11/site-packages/dask/base.py:310, in compute()
    286 def compute(self, **kwargs):
    287     """Compute this dask collection
    288 
    289     This turns a lazy Dask collection into its in-memory equivalent.
   (...)
    308     dask.compute
    309     """
--> 310     (result,) = compute(self, traverse=False, **kwargs)
    311     return result

File ~/mambaforge/envs/jupyter_dask/lib/python3.11/site-packages/dask/base.py:583, in compute()
    580 if not collections:
    581     return args
--> 583 schedule = get_scheduler(
    584     scheduler=scheduler,
    585     collections=collections,
    586     get=get,
    587 )
    589 dsk = collections_to_dsk(collections, optimize_graph, **kwargs)
    590 keys, postcomputes = [], []

File ~/mambaforge/envs/jupyter_dask/lib/python3.11/site-packages/dask/base.py:1398, in get_scheduler()
   1394     # else:  # try to connect to remote scheduler with this name
   1395     #     return get_client(scheduler).get
   1397 if config.get("scheduler", None):
-> 1398     return get_scheduler(scheduler=config.get("scheduler", None))
   1400 if config.get("get", None):
   1401     raise ValueError(get_err_msg)

File ~/mambaforge/envs/jupyter_dask/lib/python3.11/site-packages/dask/base.py:1373, in get_scheduler()
   1371 elif scheduler in ("dask.distributed", "distributed"):
   1372     if not client_available:
-> 1373         raise RuntimeError(
   1374             f"Requested {scheduler} scheduler but no Client active."
   1375         )
   1376     from distributed.worker import get_client
   1378     return get_client().get

RuntimeError: Requested dask.distributed scheduler but no Client active.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant