Sometimes problems don’t fit into one of the collections like
dask.dataframe. In these cases, users can parallelize custom algorithms
using the simpler
dask.delayed interface. This allows you to create graphs
directly with a light annotation of normal python code:
>>> x = dask.delayed(inc)(1) >>> y = dask.delayed(inc)(2) >>> z = dask.delayed(add)(x, y) >>> z.compute() 5 >>> z.visualize()
Visit https://examples.dask.org/delayed.html to see and run examples using Dask Delayed.
Sometimes we face problems that are parallelizable, but don’t fit into high-level abstractions like Dask Array or Dask DataFrame. Consider the following example:
def inc(x): return x + 1 def double(x): return x * 2 def add(x, y): return x + y data = [1, 2, 3, 4, 5] output =  for x in data: a = inc(x) b = double(x) c = add(a, b) output.append(c) total = sum(output)
There is clearly parallelism in this problem (many of the
add functions can be evaluated independently), but it’s not
clear how to convert this to an array or DataFrame computation. As written,
this code runs sequentially in a single thread. However, we see that a
lot of this could be executed in parallel.
delayed function decorates your functions so that they operate
lazily. Rather than executing your function immediately, it will defer
execution, placing the function and its arguments into a task graph.
Wraps a function or object to produce a
We slightly modify our code by wrapping functions in
This delays the execution of the function and generates a Dask graph instead:
import dask output =  for x in data: a = dask.delayed(inc)(x) b = dask.delayed(double)(x) c = dask.delayed(add)(a, b) output.append(c) total = dask.delayed(sum)(output)
We used the
dask.delayed function to wrap the function calls that we want
to turn into tasks. None of the
have happened yet. Instead, the object
total is a
Delayed result that
contains a task graph of the entire computation. Looking at the graph we see
clear opportunities for parallel execution. The Dask schedulers
will exploit this parallelism, generally improving performance (although not in this
example, because these functions are already very small and fast.)
total.visualize() # see image to the right
We can now compute this lazy result to execute the graph in parallel:
>>> total.compute() 45
It is also common to see the delayed function used as a decorator. Here is a reproduction of our original problem as a parallel code:
import dask @dask.delayed def inc(x): return x + 1 @dask.delayed def double(x): return x * 2 @dask.delayed def add(x, y): return x + y data = [1, 2, 3, 4, 5] output =  for x in data: a = inc(x) b = double(x) c = add(a, b) output.append(c) total = dask.delayed(sum)(output)
Sometimes you want to create and destroy work during execution, launch tasks from other tasks, etc. For this, see the Futures interface.
For a list of common problems and recommendations see Delayed Best Practices.
Sometimes you might find yourself wanting to add a dependency to a task that does
not take the result of that dependency as an input. For example when a task depends
on the side-effect of another task. In these cases you can use
import dask from dask.graph_manipulation import bind DATA =  @dask.delayed def inc(x): return x + 1 @dask.delayed def add_data(x): DATA.append(x) @dask.delayed def sum_data(x): return sum(DATA) + x a = inc(1) b = add_data(a) c = inc(3) d = add_data(c) e = inc(5) f = bind(sum_data, [b, d])(e) f.compute()
sum_data will operate on DATA only after both the expected items have
been appended to it.
bind can also be used along with direct dependencies
passed through the function arguments.
By default, Dask Delayed uses the threaded scheduler in order to avoid data transfer costs. You should consider using multi-processing scheduler or dask.distributed scheduler on a local machine or on a cluster if your code does not release the GIL well (computations that are dominated by pure Python code, or computations wrapping external code and holding onto it).