Dask Delayed ============ .. meta:: :description: The Dask delayed function decorates your functions so that they operate lazily. Rather than executing your function immediately, it will defer execution, placing the function and its arguments into a task graph. .. toctree:: :maxdepth: 1 :hidden: delayed-collections.rst delayed-best-practices.rst Sometimes problems don't fit into one of the collections like ``dask.array`` or ``dask.dataframe``. In these cases, users can parallelize custom algorithms using the simpler ``dask.delayed`` interface. This allows you to create graphs directly with a light annotation of normal python code: .. code-block:: python >>> x = dask.delayed(inc)(1) >>> y = dask.delayed(inc)(2) >>> z = dask.delayed(add)(x, y) >>> z.compute() 5 >>> z.visualize() .. image:: images/inc-add.svg :alt: A Dask Delayed task graph with two "inc" functions combined using an "add" function resulting in an output node. .. raw:: html Example ------- Visit https://examples.dask.org/delayed.html to see and run examples using Dask Delayed. Sometimes we face problems that are parallelizable, but don't fit into high-level abstractions like Dask Array or Dask DataFrame. Consider the following example: .. code-block:: python def inc(x): return x + 1 def double(x): return x * 2 def add(x, y): return x + y data = [1, 2, 3, 4, 5] output = [] for x in data: a = inc(x) b = double(x) c = add(a, b) output.append(c) total = sum(output) There is clearly parallelism in this problem (many of the ``inc``, ``double``, and ``add`` functions can be evaluated independently), but it's not clear how to convert this to an array or DataFrame computation. As written, this code runs sequentially in a single thread. However, we see that a lot of this could be executed in parallel. The Dask ``delayed`` function decorates your functions so that they operate *lazily*. Rather than executing your function immediately, it will defer execution, placing the function and its arguments into a task graph. .. currentmodule:: dask.delayed .. autosummary:: delayed We slightly modify our code by wrapping functions in ``delayed``. This delays the execution of the function and generates a Dask graph instead: .. code-block:: python import dask output = [] for x in data: a = dask.delayed(inc)(x) b = dask.delayed(double)(x) c = dask.delayed(add)(a, b) output.append(c) total = dask.delayed(sum)(output) We used the ``dask.delayed`` function to wrap the function calls that we want to turn into tasks. None of the ``inc``, ``double``, ``add``, or ``sum`` calls have happened yet. Instead, the object ``total`` is a ``Delayed`` result that contains a task graph of the entire computation. Looking at the graph we see clear opportunities for parallel execution. The :doc:`Dask schedulers ` will exploit this parallelism, generally improving performance (although not in this example, because these functions are already very small and fast.) .. code-block:: python total.visualize() # see image to the right .. image:: images/delayed-inc-double-add.svg :align: right :alt: A task graph with many nodes for "inc" and "double" that combine with "add" nodes. The output of the "add" nodes finally aggregate with a "sum" node. We can now compute this lazy result to execute the graph in parallel: .. code-block:: python >>> total.compute() 45 Decorator --------- It is also common to see the delayed function used as a decorator. Here is a reproduction of our original problem as a parallel code: .. code-block:: python import dask @dask.delayed def inc(x): return x + 1 @dask.delayed def double(x): return x * 2 @dask.delayed def add(x, y): return x + y data = [1, 2, 3, 4, 5] output = [] for x in data: a = inc(x) b = double(x) c = add(a, b) output.append(c) total = dask.delayed(sum)(output) Real time --------- Sometimes you want to create and destroy work during execution, launch tasks from other tasks, etc. For this, see the :doc:`Futures ` interface. Best Practices -------------- For a list of common problems and recommendations see :doc:`Delayed Best Practices `. Indirect Dependencies --------------------- Sometimes you might find yourself wanting to add a dependency to a task that does not take the result of that dependency as an input. For example when a task depends on the side-effect of another task. In these cases you can use ``dask.graph_manipulation.bind``. .. code-block:: python import dask from dask.graph_manipulation import bind DATA = [] @dask.delayed def inc(x): return x + 1 @dask.delayed def add_data(x): DATA.append(x) @dask.delayed def sum_data(x): return sum(DATA) + x a = inc(1) b = add_data(a) c = inc(3) d = add_data(c) e = inc(5) f = bind(sum_data, [b, d])(e) f.compute() ``sum_data`` will operate on DATA only after both the expected items have been appended to it. ``bind`` can also be used along with direct dependencies passed through the function arguments. Execution --------- By default, Dask Delayed uses the threaded scheduler in order to avoid data transfer costs. You should consider using `multi-processing`_ scheduler or `dask.distributed`_ scheduler on a local machine or on a cluster if your code does not `release the GIL well`_ (computations that are dominated by pure Python code, or computations wrapping external code and holding onto it). .. _`dask.distributed`: https://distributed.dask.org/en/latest/ .. _`multi-processing`: https://docs.dask.org/en/stable/scheduling.html#local-processes .. _`release the GIL well`: https://docs.dask.org/en/stable/graphs.html#avoid-holding-the-gil