dask.dataframe.DataFrame.map_partitions#
- DataFrame.map_partitions(func, *args, meta=<no_default>, enforce_metadata=True, transform_divisions=True, clear_divisions=False, align_dataframes=False, parent_meta=None, required_columns=None, **kwargs)#
Apply a Python function to each partition
- Parameters:
- funcfunction
Function applied to each partition.
- args, kwargs
Arguments and keywords to pass to the function. Arguments and keywords may contain
FrameBaseor regular python objects. DataFrame-like args (both dask and pandas) must have the same number of partitions asselfor comprise a single partition. Key-word arguments, Single-partition arguments, and general python-object arguments will be broadcasted to all partitions.- enforce_metadatabool, default True
Whether to enforce at runtime that the structure of the DataFrame produced by
funcactually matches the structure ofmeta. This will rename and reorder columns for each partition, and will raise an error if this doesn’t work, but it won’t raise if dtypes don’t match.- transform_divisionsbool, default True
Whether to apply the function onto the divisions and apply those transformed divisions to the output.
- clear_divisionsbool, default False
Whether divisions should be cleared. If True, transform_divisions will be ignored.
- required_columnslist or None, default None
List of columns that
funcrequires for execution. These columns must belong to the first DataFrame argument (inargs). If None is specified (the default), the query optimizer will assume that all input columns are required.- metapd.DataFrame, pd.Series, dict, iterable, tuple, optional
An empty
pd.DataFrameorpd.Seriesthat matches the dtypes and column names of the output. This metadata is necessary for many algorithms in dask dataframe to work. For ease of use, some alternative inputs are also available. Instead of aDataFrame, adictof{name: dtype}or iterable of(name, dtype)can be provided (note that the order of the names should match the order of the columns). Instead of a series, a tuple of(name, dtype)can be used. If not provided, dask will try to infer the metadata. This may lead to unexpected results, so providingmetais recommended. For more information, seedask.dataframe.utils.make_meta.
Examples
Given a DataFrame, Series, or Index, such as:
>>> import pandas as pd >>> import dask.dataframe as dd >>> df = pd.DataFrame({'x': [1, 2, 3, 4, 5], ... 'y': [1., 2., 3., 4., 5.]}) >>> ddf = dd.from_pandas(df, npartitions=2)
One can use
map_partitionsto apply a function on each partition. Extra arguments and keywords can optionally be provided, and will be passed to the function after the partition.Here we apply a function with arguments and keywords to a DataFrame, resulting in a Series:
>>> def myadd(df, a, b=1): ... return df.x + df.y + a + b >>> res = ddf.map_partitions(myadd, 1, b=2) >>> res.dtype dtype('float64')
Here we apply a function to a Series resulting in a Series:
>>> res = ddf.x.map_partitions(lambda x: len(x)) # ddf.x is a Dask Series Structure >>> res.dtype dtype('int64')
By default, dask tries to infer the output metadata by running your provided function on some fake data. This works well in many cases, but can sometimes be expensive, or even fail. To avoid this, you can manually specify the output metadata with the
metakeyword. This can be specified in many forms, for more information seedask.dataframe.utils.make_meta.Here we specify the output is a Series with no name, and dtype
float64:>>> res = ddf.map_partitions(myadd, 1, b=2, meta=(None, 'f8'))
Here we map a function that takes in a DataFrame, and returns a DataFrame with a new column:
>>> res = ddf.map_partitions(lambda df: df.assign(z=df.x * df.y)) >>> res.dtypes x int64 y float64 z float64 dtype: object
As before, the output metadata can also be specified manually. This time we pass in a
dict, as the output is a DataFrame:>>> res = ddf.map_partitions(lambda df: df.assign(z=df.x * df.y), ... meta={'x': 'i8', 'y': 'f8', 'z': 'f8'})
In the case where the metadata doesn’t change, you can also pass in the object itself directly:
>>> res = ddf.map_partitions(lambda df: df.head(), meta=ddf)
Also note that the index and divisions are assumed to remain unchanged. If the function you’re mapping changes the index/divisions, you’ll need to pass
clear_divisions=True.>>> ddf.map_partitions(func, clear_divisions=True)
Your map function gets information about where it is in the dataframe by accepting a special
partition_infokeyword argument.>>> def func(partition, partition_info=None): ... pass
This will receive the following information:
>>> partition_info {'number': 1, 'division': 3}
For each argument and keyword arguments that are dask dataframes you will receive the number (n) which represents the nth partition of the dataframe and the division (the first index value in the partition). If divisions are not known (for instance if the index is not sorted) then you will get None as the division.