Dask is a versatile tool that supports a variety of workloads. This page contains brief and illustrative examples for how people use Dask in practice. This page emphasizes breadth and hopefully inspires readers to find new ways that Dask can serve them beyond their original intent.
Dask use cases can be roughly divided in the following two categories:
- Large NumPy/Pandas/Lists with dask.array, dask.dataframe, dask.bag to analyze large datasets with familiar techniques. This is similar to Databases, Spark, or big array libraries.
- Custom task scheduling. You submit a graph of functions that depend on each other for custom workloads. This is similar to Luigi, Airflow, Celery, or Makefiles.
Most people today approach Dask assuming it is a framework like Spark, designed for the first use case around large collections of uniformly shaped data. However, many of the more productive and novel use cases fall into the second category, using Dask to parallelize custom workflows.
Dask compute environments can be divided into the following two categories:
- Single machine parallelism with threads or processes: The Dask single-machine scheduler leverages the full CPU power of a laptop or a large workstation and changes the space limitation from “fits in memory” to “fits on disk”. This scheduler is simple to use and doesn’t have the computational or conceptual overhead of most “big data” systems.
- Distributed cluster parallelism on multiple nodes: The Dask distributed scheduler coordinates the actions of multiple machines on a cluster. It scales anywhere from a single machine to a thousand machines, but not significantly beyond.
The single machine scheduler is useful to more individuals (more people have personal laptops than have access to clusters) and probably accounts for 80+% of the use of Dask today. The distributed machine scheduler is useful to larger organizations like universities, research labs, or private companies.
Below we give specific examples of how people use Dask. We start with large NumPy/Pandas/List examples because they’re somewhat more familiar to people looking at “big data” frameworks. We then follow with custom scheduling examples, which tend to be applicable more often, and are arguably a bit more interesting.
Dask contains large parallel collections for n-dimensional arrays (similar to NumPy), dataframes (similar to Pandas), and lists (similar to PyToolz or PySpark).
On disk arrays¶
Scientists studying the earth have 10GB to 100GB of regularly gridded weather data on their laptop’s hard drive stored as many individual HDF5 or NetCDF files. They use dask.array to treat this stack of HDF5 or NetCDF files as a single NumPy array (or a collection of NumPy arrays with the XArray project). They slice, perform reductions, perform seasonal averaging etc. all with straight Numpy syntax. These computations take a few minutes (reading 100GB from disk is somewhat slow) but previously infeasible computations become convenient from the comfort of a personal laptop.
It’s not so much parallel computing that is valuable here but rather the ability to comfortably compute on larger-than-memory data without special hardware.
import h5py dataset = h5py.File('myfile.hdf5')['/x'] import dask.array as da x = da.from_array(dataset, chunks=dataset.chunks) y = x[::10] - x.mean(axis=0) y.compute()
Directory of CSV or tabular HDF files¶
Analysts studying time series data have a large directory of CSV, HDF, or otherwise formatted tabular files. They usually use Pandas for this kind of data but either the volume is too large or dealing with a large number of files is confusing. They use dask.dataframe to logically wrap all of these different files into one logical dataframe that is built on demand to save space. Most of their Pandas workflow is the same (Dask.dataframe is a subset of Pandas) so they switch from Pandas to Dask.dataframe and back easily without significantly changing their code.
import dask.dataframe as dd df = dd.read_csv('data/2016-*.*.csv', parse_dates=['timestamp']) df.groupby(df.timestamp.dt.hour).value.mean().compute()
Directory of CSV files on HDFS¶
The same analyst as above uses dask.dataframe with the dask.distributed scheduler to analyze terabytes of data on their institution’s Hadoop cluster straight from Python. This uses either the hdfs3 or pyarrow Python libraries for HDFS management
This solution is particularly attractive because it stays within the Python ecosystem, and uses the speed and algorithm set of Pandas, a tool with which the analyst is already very comfortable.
from dask.distributed import Client client = Client('cluster-address:8786') import dask.dataframe as dd df = dd.read_csv('hdfs://data/2016-*.*.csv', parse_dates=['timestamp']) df.groupby(df.timestamp.dt.hour).value.mean().compute()
Directories of custom format files¶
The same analyst has a bunch of files of a custom format not supported by Dask.dataframe, or perhaps these files are in a directory structure that encodes important information about his data (such as the date or other metadata.) They use dask.delayed to teach Dask.dataframe how to load the data and then pass into dask.dataframe for tabular algorithms.
- Example Notebook: https://gist.github.com/mrocklin/e7b7b3a65f2835cda813096332ec73ca
Data Engineers with click stream data from a website or mechanical engineers with telemetry data from mechanical instruments have large volumes of data in JSON or some other semi-structured format. They use dask.bag to manipulate many Python objects in parallel either on their personal machine, where they stream the data through memory or across a cluster.
import dask.bag as db import json records = db.read_text('data/2015-*-*.json').map(json.loads) records.filter(lambda d: d['name'] == 'Alice').pluck('id').frequencies()
The large collections (array, dataframe, bag) are wonderful when they fit the application, for example if you want to perform a groupby on a directory of CSV data. However several parallel computing applications don’t fit neatly into one of these higher level abstractions. Fortunately, Dask provides a wide variety of ways to parallelize more custom applications. These use the same machinery as the arrays and dataframes, but allow the user to develop custom algorithms specific to their problem.
Embarrassingly parallel computation¶
A programmer has a function that they want to run many times on different inputs. Their function and inputs might use arrays or dataframes internally, but conceptually their problem isn’t a single large array or dataframe.
They want to run these functions in parallel on their laptop while they prototype but they also intend to eventually use an in-house cluster. They wrap their function in dask.delayed and let the appropriate dask scheduler parallelize and load balance the work.
def process(data): ... return ...
Normal Sequential Processing:
results = [process(x) for x in inputs]
Build Dask Computation:
from dask import compute, delayed values = [delayed(process)(x) for x in inputs]
import dask.threaded results = compute(*values, get=dask.threaded.get)
import dask.multiprocessing results = compute(*values, get=dask.multiprocessing.get)
from dask.distributed import Client client = Client("cluster-address:8786") results = compute(*values, get=client.get)
A financial analyst has many models that depend on each other in a complex web of computations.
data = [load(fn) for fn in filenames] reference = load_from_database(query) A = [model_a(x, reference) for x in data] B = [model_b(x, reference) for x in data] roll_A = [roll(A[i], A[i + 1]) for i in range(len(A) - 1)] roll_B = [roll(B[i], B[i + 1]) for i in range(len(B) - 1)] compare = [compare_ab(a, b) for a, b in zip(A, B)] results = summarize(compare, roll_A, roll_B)
These models are time consuming and need to be run on a variety of inputs and situations. The analyst has his code now as a collection of Python functions and is trying to figure out how to parallelize such a codebase. They use dask.delayed to wrap their function calls and capture the implicit parallelism.
from dask import compute, delayed data = [delayed(load)(fn) for fn in filenames] reference = delayed(load_from_database)(query) A = [delayed(model_a)(x, reference) for x in data] B = [delayed(model_b)(x, reference) for x in data] roll_A = [delayed(roll)(A[i], A[i + 1]) for i in range(len(A) - 1)] roll_B = [delayed(roll)(B[i], B[i + 1]) for i in range(len(B) - 1)] compare = [delayed(compare_ab)(a, b) for a, b in zip(A, B)] lazy_results = delayed(summarize)(compare, roll_A, roll_B)
They then depend on the dask schedulers to run this complex web of computations in parallel.
results = compute(lazy_results)
They appreciate how easy it was to transition from the experimental code to a scalable parallel version. This code is also easy enough for their teammates to understand easily and extend in the future.
A graduate student in machine learning is prototyping novel parallel algorithms. They are in a situation much like the financial analyst above except that they need to benchmark and profile their computation heavily under a variety of situations and scales. The dask profiling tools (single machine diagnostics and distributed diagnostics) provide the feedback they need to understand their parallel performance, including how long each task takes, how intense communication is, and their scheduling overhead. They scale their algorithm between 1 and 50 cores on single workstations and then scale out to a cluster running their computation at thousands of cores. They don’t have access to an institutional cluster, so instead they use dask-ec2 to easily provision clusters of varying sizes.
Their algorithm is written the same in all cases, drastically reducing the cognitive load, and letting the readers of their work experiment with their system on their own machines, aiding reproducibility.
Scikit-Learn or Joblib User¶
A data scientist wants to scale their machine learning pipeline to run on their
cluster to accelerate parameter searches. They already use the
njobs= parameter to accelerate their computation on their local computer
with Joblib. Now they wrap their
sklearn code with a context manager to
parallelize the exact same code across a cluster (also available with
import distributed.joblib with joblib.parallel_backend('distributed', scheduler_host=('192.168.1.100', 8786)): result = GridSearchCV( ... ) # normal sklearn code
Academic Cluster Administrator¶
A system administrator for a university compute cluster wants to enable many researchers to use the available cluster resources, which are currently lying idle. The research faculty and graduate students lack experience with job schedulers and MPI, but are comfortable interacting with Python code through a Jupyter notebook.
Teaching the faculty and graduate students to parallelize software has proven time consuming. Instead the administrator sets up dask.distributed on a sandbox allocation of the cluster and broadly publishes the address of the scheduler, pointing researchers to the dask.distributed quickstart. Utilization of the cluster climbs steadily over the next week as researchers are more easily able to parallelize their computations without having to learn foreign interfaces. The administrator is happy because resources are being used without significant hand-holding.
As utilization increases the administrator has a new problem; the shared dask.distributed cluster is being overused. The administrator tracks use through Dask diagnostics to identify which users are taking most of the resources. They contact these users and teach them how to launch their own dask.distributed clusters using the traditional job scheduler on their cluster, making space for more new users in the sandbox allocation.
Financial Modeling Team¶
Similar to the case above, a team of modelers working at a financial institution run a complex network of computational models on top of each other. They started using dask.delayed individually, as suggested above, but realized that they often perform highly overlapping computations, such as always reading the same data.
Now they decide to use the same Dask cluster collaboratively to save on these costs. Because Dask intelligently hashes computations in a way similar to how Git works, they find that when two people submit similar computations the overlapping part of the computation runs only once.
Ever since working collaboratively on the same cluster they find that their frequently running jobs run much faster, because most of the work is already done by previous users. When they share scripts with colleagues they find that those repeated scripts complete immediately rather than taking several hours.
They are now able to iterate and share data as a team more effectively, decreasing their time to result and increasing their competitive edge.
As this becomes more heavily used on the company cluster they decide to set up
an auto-scaling system. They use their dynamic job scheduler (perhaps SGE,
LSF, Mesos, or Marathon) to run a single
dask-scheduler 24/7 and then scale
up and down the number of
dask-workers running on the cluster based on
computational load. This solution ends up being more responsive (and thus more
heavily used) than their previous attempts to provide institution-wide access
to parallel computing but because it responds to load it still acts as a good
citizen in the cluster.
Streaming data engineering¶
A data engineer responsible for watching a data feed needs to scale out a continuous process. They combine dask.distributed with normal Python Queues to produce a rudimentary but effective stream processing system.
Because dask.distributed is elastic, they can scale up or scale down their cluster resources in response to demand.