dask.dataframe.from_map

dask.dataframe.from_map

dask.dataframe.from_map(func, *iterables, args=None, meta=None, divisions=None, label=None, token=None, enforce_metadata=True, **kwargs)[source]

Create a DataFrame collection from a custom function map

WARNING: The from_map API is experimental, and stability is not yet guaranteed. Use at your own risk!

Parameters
funccallable

Function used to create each partition. If func satisfies the DataFrameIOFunction protocol, column projection will be enabled.

*iterablesIterable objects

Iterable objects to map to each output partition. All iterables must be the same length. This length determines the number of partitions in the output collection (only one element of each iterable will be passed to func for each partition).

argslist or tuple, optional

Positional arguments to broadcast to each output partition. Note that these arguments will always be passed to func after the iterables positional arguments.

metapd.DataFrame, pd.Series, dict, iterable, tuple, optional

An empty pd.DataFrame or pd.Series that matches the dtypes and column names of the output. This metadata is necessary for many algorithms in dask dataframe to work. For ease of use, some alternative inputs are also available. Instead of a DataFrame, a dict of {name: dtype} or iterable of (name, dtype) can be provided (note that the order of the names should match the order of the columns). Instead of a series, a tuple of (name, dtype) can be used. If not provided, dask will try to infer the metadata. This may lead to unexpected results, so providing meta is recommended. For more information, see dask.dataframe.utils.make_meta.

divisionstuple, str, optional

Partition boundaries along the index. For tuple, see https://docs.dask.org/en/latest/dataframe-design.html#partitions For string ‘sorted’ will compute the delayed values to find index values. Assumes that the indexes are mutually sorted. If None, then won’t use index information

labelstr, optional

String to use as the function-name label in the output collection-key names.

tokenstr, optional

String to use as the “token” in the output collection-key names.

enforce_metadatabool, default True

Whether to enforce at runtime that the structure of the DataFrame produced by func actually matches the structure of meta. This will rename and reorder columns for each partition, and will raise an error if this doesn’t work, but it won’t raise if dtypes don’t match.

**kwargs:

Key-word arguments to broadcast to each output partition. These same arguments will be passed to func for every output partition.

See also

dask.dataframe.from_delayed
dask.layers.DataFrameIOLayer

Examples

>>> import pandas as pd
>>> import dask.dataframe as dd
>>> func = lambda x, size=0: pd.Series([x] * size)
>>> inputs = ["A", "B"]
>>> dd.from_map(func, inputs, size=2).compute()
0    A
1    A
0    B
1    B
dtype: object

This API can also be used as an alternative to other file-based IO functions, like read_parquet (which are already just from_map wrapper functions):

>>> import pandas as pd
>>> import dask.dataframe as dd
>>> paths = ["0.parquet", "1.parquet", "2.parquet"]
>>> dd.from_map(pd.read_parquet, paths).head()  
                    name
timestamp
2000-01-01 00:00:00   Laura
2000-01-01 00:00:01  Oliver
2000-01-01 00:00:02   Alice
2000-01-01 00:00:03  Victor
2000-01-01 00:00:04     Bob

Since from_map allows you to map an arbitrary function to any number of iterable objects, it can be a very convenient means of implementing functionality that may be missing from from other DataFrame-creation methods. For example, if you happen to have apriori knowledge about the number of rows in each of the files in a dataset, you can generate a DataFrame collection with a global RangeIndex:

>>> import pandas as pd
>>> import numpy as np
>>> import dask.dataframe as dd
>>> paths = ["0.parquet", "1.parquet", "2.parquet"]
>>> file_sizes = [86400, 86400, 86400]
>>> def func(path, row_offset):
...     # Read parquet file and set RangeIndex offset
...     df = pd.read_parquet(path)
...     return df.set_index(
...         pd.RangeIndex(row_offset, row_offset+len(df))
...     )
>>> def get_ddf(paths, file_sizes):
...     offsets = [0] + list(np.cumsum(file_sizes))
...     return dd.from_map(
...         func, paths, offsets[:-1], divisions=offsets
...     )
>>> ddf = get_ddf(paths, file_sizes)  
>>> ddf.index  
Dask Index Structure:
npartitions=3
0         int64
86400       ...
172800      ...
259200      ...
dtype: int64
Dask Name: myfunc, 6 tasks