Create Dask Bags

There are several ways to create Dask.bags around your data:

db.from_sequence

You can create a bag from an existing Python iterable:

>>> import dask.bag as db
>>> b = db.from_sequence([1, 2, 3, 4, 5, 6])

You can control the number of partitions into which this data is binned:

>>> b = db.from_sequence([1, 2, 3, 4, 5, 6], npartitions=2)

This controls the granularity of the parallelism that you expose. By default dask will try to partition your data into about 100 partitions.

IMPORTANT: do not load your data into Python and then load that data into dask.bag. Instead, use dask.bag to load your data. This parallelizes the loading step and reduces inter-worker communication:

>>> b = db.from_sequence(['1.dat', '2.dat', ...]).map(load_from_filename)

db.read_text

Dask.bag can load data directly from textfiles. You can pass either a single filename, a list of filenames, or a globstring. The resulting bag will have one item per line, one file per partition:

>>> b = db.read_text('myfile.txt')
>>> b = db.read_text(['myfile.1.txt', 'myfile.2.txt', ...])
>>> b = db.read_text('myfile.*.txt')

This handles standard compression libraries like gzip, bz2, xz, or any easily installed compression library that has a File-like object. Compression will be inferred by filename extension, or by using the compression='gzip' keyword:

>>> b = db.read_text('myfile.*.txt.gz')

The resulting items in the bag are strings. If you have encoded data like JSON then you may want to map a decoding or load function across the bag:

>>> import json
>>> b = db.read_text('myfile.*.json').map(json.loads)

Or do string munging tasks. For convenience there is a string namespace attached directly to bags with .str.methodname:

>>> b = db.read_text('myfile.*.csv').str.strip().str.split(',')

db.from_delayed

You can construct a dask bag from dask.delayed values using the db.from_delayed function. See documentation on using dask.delayed with collections for more information.

Store Dask Bags

In Memory

You can convert a dask bag to a list or Python iterable by calling compute() or by converting the object into a list

>>> result = b.compute()
or
>>> result = list(b)

To Textfiles

You can convert a dask bag into a sequence of files on disk by calling the .to_textfiles() method

dask.bag.core.to_textfiles(b, path, name_function=None, compression='infer', encoding='utf-8', compute=True)

Write bag to disk, one filename per partition, one line per element

Paths: This will create one file for each partition in your bag. You can specify the filenames in a variety of ways.

Use a globstring

>>> b.to_textfiles('/path/to/data/*.json.gz')  

The * will be replaced by the increasing sequence 1, 2, ...

/path/to/data/0.json.gz
/path/to/data/1.json.gz

Use a globstring and a name_function= keyword argument. The name_function function should expect an integer and produce a string. Strings produced by name_function must preserve the order of their respective partition indices.

>>> from datetime import date, timedelta
>>> def name(i):
...     return str(date(2015, 1, 1) + i * timedelta(days=1))
>>> name(0)
'2015-01-01'
>>> name(15)
'2015-01-16'
>>> b.to_textfiles('/path/to/data/*.json.gz', name_function=name)  
/path/to/data/2015-01-01.json.gz
/path/to/data/2015-01-02.json.gz
...

You can also provide an explicit list of paths.

>>> paths = ['/path/to/data/alice.json.gz', '/path/to/data/bob.json.gz', ...]  
>>> b.to_textfiles(paths) 

Compression: Filenames with extensions corresponding to known compression algorithms (gz, bz2) will be compressed accordingly.

Bag Contents: The bag calling to_textfiles must be a bag of text strings. For example, a bag of dictionaries could be written to JSON text files by mapping json.dumps on to the bag first, and then calling to_textfiles :

>>> b_dict.map(json.dumps).to_textfiles("/path/to/data/*.json")  

To DataFrames

You can convert a dask bag into a dask dataframe and use those storage solutions.

Bag.to_dataframe(columns=None)

Create Dask Dataframe from a Dask Bag

Bag should contain tuples, dict records, or scalars.

Index will not be particularly meaningful. Use reindex afterwards if necessary.

Parameters:

columns : pandas.DataFrame or list, optional

If a pandas.DataFrame, it should mirror the column names and dtypes of the output dataframe. If a list, it provides the desired column names. If not provided or a list, a single element from the first partition will be computed, triggering a potentially expensive call to compute. Providing a list is only useful for selecting subset of columns, to avoid an internal compute call you must provide a pandas.DataFrame as dask requires dtype knowledge ahead of time.

Examples

>>> import dask.bag as db
>>> b = db.from_sequence([{'name': 'Alice',   'balance': 100},
...                       {'name': 'Bob',     'balance': 200},
...                       {'name': 'Charlie', 'balance': 300}],
...                      npartitions=2)
>>> df = b.to_dataframe()
>>> df.compute()
   balance     name
0      100    Alice
1      200      Bob
0      300  Charlie

To Delayed Values

You can convert a dask bag into a list of dask delayed values and custom storage solutions from there.

Bag.to_delayed()

Convert bag to list of dask Delayed

Returns list of Delayed, one per partition.