Create Dask Bags

There are several ways to create Dask.bags around your data:

db.from_sequence

You can create a bag from an existing Python iterable:

>>> import dask.bag as db
>>> b = db.from_sequence([1, 2, 3, 4, 5, 6])

You can control the number of partitions into which this data is binned:

>>> b = db.from_sequence([1, 2, 3, 4, 5, 6], npartitions=2)

This controls the granularity of the parallelism that you expose. By default dask will try to partition your data into about 100 partitions.

IMPORTANT: do not load your data into Python and then load that data into dask.bag. Instead, use dask.bag to load your data. This parallelizes the loading step and reduces inter-worker communication:

>>> b = db.from_sequence(['1.dat', '2.dat', ...]).map(load_from_filename)

db.read_text

Dask.bag can load data directly from textfiles. You can pass either a single filename, a list of filenames, or a globstring. The resulting bag will have one item per line, one file per partition:

>>> b = db.read_text('myfile.txt')
>>> b = db.read_text(['myfile.1.txt', 'myfile.2.txt', ...])
>>> b = db.read_text('myfile.*.txt')

This handles standard compression libraries like gzip, bz2, xz, or any easily installed compression library that has a File-like object. Compression will be inferred by filename extension, or by using the compression='gzip' keyword:

>>> b = db.read_text('myfile.*.txt.gz')

The resulting items in the bag are strings. If you have encoded data like line-delimited JSON then you may want to map a decoding or load function across the bag:

>>> import json
>>> b = db.read_text('myfile.*.json').map(json.loads)

Or do string munging tasks. For convenience there is a string namespace attached directly to bags with .str.methodname:

>>> b = db.read_text('myfile.*.csv').str.strip().str.split(',')

db.from_delayed

You can construct a dask bag from dask.delayed values using the db.from_delayed function. See documentation on using dask.delayed with collections for more information.

Store Dask Bags

In Memory

You can convert a dask bag to a list or Python iterable by calling compute() or by converting the object into a list

>>> result = b.compute()
or
>>> result = list(b)

To Textfiles

You can convert a dask bag into a sequence of files on disk by calling the .to_textfiles() method

dask.bag.core.to_textfiles(b, path, name_function=None, compression='infer', encoding='utf-8', compute=True, get=None, storage_options=None)

Write bag to disk, one filename per partition, one line per element

Paths: This will create one file for each partition in your bag. You can specify the filenames in a variety of ways.

Use a globstring

>>> b.to_textfiles('/path/to/data/*.json.gz')  

The * will be replaced by the increasing sequence 1, 2, ...

/path/to/data/0.json.gz
/path/to/data/1.json.gz

Use a globstring and a name_function= keyword argument. The name_function function should expect an integer and produce a string. Strings produced by name_function must preserve the order of their respective partition indices.

>>> from datetime import date, timedelta
>>> def name(i):
...     return str(date(2015, 1, 1) + i * timedelta(days=1))
>>> name(0)
'2015-01-01'
>>> name(15)
'2015-01-16'
>>> b.to_textfiles('/path/to/data/*.json.gz', name_function=name)  
/path/to/data/2015-01-01.json.gz
/path/to/data/2015-01-02.json.gz
...

You can also provide an explicit list of paths.

>>> paths = ['/path/to/data/alice.json.gz', '/path/to/data/bob.json.gz', ...]  
>>> b.to_textfiles(paths) 

Compression: Filenames with extensions corresponding to known compression algorithms (gz, bz2) will be compressed accordingly.

Bag Contents: The bag calling to_textfiles must be a bag of text strings. For example, a bag of dictionaries could be written to JSON text files by mapping json.dumps on to the bag first, and then calling to_textfiles :

>>> b_dict.map(json.dumps).to_textfiles("/path/to/data/*.json")  

To DataFrames

You can convert a dask bag into a dask dataframe and use those storage solutions.

Bag.to_dataframe(meta=None, columns=None)

Create Dask Dataframe from a Dask Bag

Bag should contain tuples, dict records, or scalars.

Index will not be particularly meaningful. Use reindex afterwards if necessary.

Parameters:

meta : pd.DataFrame, dict, iterable, optional

An empty pd.DataFrame that matches the dtypes and column names of the output. This metadata is necessary for many algorithms in dask dataframe to work. For ease of use, some alternative inputs are also available. Instead of a DataFrame, a dict of {name: dtype} or iterable of (name, dtype) can be provided. If not provided or a list, a single element from the first partition will be computed, triggering a potentially expensive call to compute. This may lead to unexpected results, so providing meta is recommended. For more information, see dask.dataframe.utils.make_meta.

columns : sequence, optional

Column names to use. If the passed data do not have names associated with them, this argument provides names for the columns. Otherwise this argument indicates the order of the columns in the result (any names not found in the data will become all-NA columns). Note that if meta is provided, column names will be taken from there and this parameter is invalid.

Examples

>>> import dask.bag as db
>>> b = db.from_sequence([{'name': 'Alice',   'balance': 100},
...                       {'name': 'Bob',     'balance': 200},
...                       {'name': 'Charlie', 'balance': 300}],
...                      npartitions=2)
>>> df = b.to_dataframe()
>>> df.compute()
   balance     name
0      100    Alice
1      200      Bob
0      300  Charlie

To Delayed Values

You can convert a dask bag into a list of dask delayed values and custom storage solutions from there.

Bag.to_delayed()

Convert bag to list of dask Delayed

Returns list of Delayed, one per partition.