dask.bag.Bag.to_avro

dask.bag.Bag.to_avro

Bag.to_avro(filename, schema, name_function=None, storage_options=None, codec='null', sync_interval=16000, metadata=None, compute=True, **kwargs)

Write bag to set of avro files

The schema is a complex dictionary describing the data, see https://avro.apache.org/docs/1.8.2/gettingstartedpython.html#Defining+a+schema and https://fastavro.readthedocs.io/en/latest/writer.html . It’s structure is as follows:

{'name': 'Test',
 'namespace': 'Test',
 'doc': 'Descriptive text',
 'type': 'record',
 'fields': [
    {'name': 'a', 'type': 'int'},
 ]}

where the “name” field is required, but “namespace” and “doc” are optional descriptors; “type” must always be “record”. The list of fields should have an entry for every key of the input records, and the types are like the primitive, complex or logical types of the Avro spec ( https://avro.apache.org/docs/1.8.2/spec.html ).

Results in one avro file per input partition.

Parameters
b: dask.bag.Bag
filename: list of str or str

Filenames to write to. If a list, number must match the number of partitions. If a string, must include a glob character “*”, which will be expanded using name_function

schema: dict

Avro schema dictionary, see above

name_function: None or callable

Expands integers into strings, see dask.bytes.utils.build_name_function

storage_options: None or dict

Extra key/value options to pass to the backend file-system

codec: ‘null’, ‘deflate’, or ‘snappy’

Compression algorithm

sync_interval: int

Number of records to include in each block within a file

metadata: None or dict

Included in the file header

compute: bool

If True, files are written immediately, and function blocks. If False, returns delayed objects, which can be computed by the user where convenient.

kwargs: passed to compute(), if compute=True

Examples

>>> import dask.bag as db
>>> b = db.from_sequence([{'name': 'Alice', 'value': 100},
...                       {'name': 'Bob', 'value': 200}])
>>> schema = {'name': 'People', 'doc': "Set of people's scores",
...           'type': 'record',
...           'fields': [
...               {'name': 'name', 'type': 'string'},
...               {'name': 'value', 'type': 'int'}]}
>>> b.to_avro('my-data.*.avro', schema)  
['my-data.0.avro', 'my-data.1.avro']