Source code for dask.dataframe.io.io

from __future__ import annotations

from collections.abc import Iterable
from functools import partial
from math import ceil
from operator import getitem
from threading import Lock
from typing import TYPE_CHECKING, Literal, overload

import numpy as np
import pandas as pd

import dask.array as da
from dask.base import is_dask_collection, tokenize
from dask.blockwise import BlockwiseDepDict, blockwise
from dask.dataframe._compat import is_any_real_numeric_dtype
from dask.dataframe.backends import dataframe_creation_dispatch
from dask.dataframe.core import (
    DataFrame,
    Index,
    Series,
    _concat,
    _emulate,
    _Frame,
    apply_and_enforce,
    has_parallel_type,
    new_dd_object,
)
from dask.dataframe.dispatch import meta_lib_from_array
from dask.dataframe.io.utils import DataFrameIOFunction
from dask.dataframe.utils import (
    check_meta,
    insert_meta_param_description,
    is_series_like,
    make_meta,
)
from dask.delayed import Delayed, delayed
from dask.highlevelgraph import HighLevelGraph
from dask.layers import DataFrameIOLayer
from dask.utils import M, funcname, is_arraylike

if TYPE_CHECKING:
    import distributed


lock = Lock()


def _meta_from_array(x, columns=None, index=None, meta=None):
    """Create empty DataFrame or Series which has correct dtype"""

    if x.ndim > 2:
        raise ValueError(
            "from_array does not input more than 2D array, got"
            " array with shape %r" % (x.shape,)
        )

    if index is not None:
        if not isinstance(index, Index):
            raise ValueError("'index' must be an instance of dask.dataframe.Index")
        index = index._meta

    if meta is None:
        meta = meta_lib_from_array(x).DataFrame()

    if getattr(x.dtype, "names", None) is not None:
        # record array has named columns
        if columns is None:
            columns = list(x.dtype.names)
        elif np.isscalar(columns):
            raise ValueError("For a struct dtype, columns must be a list.")
        elif not all(i in x.dtype.names for i in columns):
            extra = sorted(set(columns).difference(x.dtype.names))
            raise ValueError(f"dtype {x.dtype} doesn't have fields {extra}")
        fields = x.dtype.fields
        dtypes = [fields[n][0] if n in fields else "f8" for n in columns]
    elif x.ndim == 1:
        if np.isscalar(columns) or columns is None:
            return meta._constructor_sliced(
                [], name=columns, dtype=x.dtype, index=index
            )
        elif len(columns) == 1:
            return meta._constructor(
                np.array([], dtype=x.dtype), columns=columns, index=index
            )
        raise ValueError(
            "For a 1d array, columns must be a scalar or single element list"
        )
    else:
        if np.isnan(x.shape[1]):
            raise ValueError("Shape along axis 1 must be known")
        if columns is None:
            columns = list(range(x.shape[1])) if x.ndim == 2 else [0]
        elif len(columns) != x.shape[1]:
            raise ValueError(
                "Number of column names must match width of the array. "
                f"Got {len(columns)} names for {x.shape[1]} columns"
            )
        dtypes = [x.dtype] * len(columns)

    data = {c: np.array([], dtype=dt) for (c, dt) in zip(columns, dtypes)}
    return meta._constructor(data, columns=columns, index=index)


[docs]def from_array(x, chunksize=50000, columns=None, meta=None): """Read any sliceable array into a Dask Dataframe Uses getitem syntax to pull slices out of the array. The array need not be a NumPy array but must support slicing syntax x[50000:100000] and have 2 dimensions: x.ndim == 2 or have a record dtype: x.dtype == [('name', 'O'), ('balance', 'i8')] Parameters ---------- x : array_like chunksize : int, optional The number of rows per partition to use. columns : list or string, optional list of column names if DataFrame, single string if Series meta : object, optional An optional `meta` parameter can be passed for dask to specify the concrete dataframe type to use for partitions of the Dask dataframe. By default, pandas DataFrame is used. Returns ------- dask.DataFrame or dask.Series A dask DataFrame/Series """ if isinstance(x, da.Array): return from_dask_array(x, columns=columns, meta=meta) meta = _meta_from_array(x, columns, meta=meta) divisions = tuple(range(0, len(x), chunksize)) divisions = divisions + (len(x) - 1,) token = tokenize(x, chunksize, columns) name = "from_array-" + token dsk = {} for i in range(0, int(ceil(len(x) / chunksize))): data = (getitem, x, slice(i * chunksize, (i + 1) * chunksize)) if is_series_like(meta): dsk[name, i] = (type(meta), data, None, meta.dtype, meta.name) else: dsk[name, i] = (type(meta), data, None, meta.columns) return new_dd_object(dsk, name, meta, divisions)
@overload def from_pandas( data: pd.DataFrame, npartitions: int | None = None, chunksize: int | None = None, sort: bool = True, name: str | None = None, ) -> DataFrame: ... # We ignore this overload for now until pandas-stubs can be added. # See https://github.com/dask/dask/issues/9220 @overload def from_pandas( # type: ignore data: pd.Series, npartitions: int | None = None, chunksize: int | None = None, sort: bool = True, name: str | None = None, ) -> Series: ...
[docs]def from_pandas( data: pd.DataFrame | pd.Series, npartitions: int | None = None, chunksize: int | None = None, sort: bool = True, name: str | None = None, ) -> DataFrame | Series: """ Construct a Dask DataFrame from a Pandas DataFrame This splits an in-memory Pandas dataframe into several parts and constructs a dask.dataframe from those parts on which Dask.dataframe can operate in parallel. By default, the input dataframe will be sorted by the index to produce cleanly-divided partitions (with known divisions). To preserve the input ordering, make sure the input index is monotonically-increasing. The ``sort=False`` option will also avoid reordering, but will not result in known divisions. Note that, despite parallelism, Dask.dataframe may not always be faster than Pandas. We recommend that you stay with Pandas for as long as possible before switching to Dask.dataframe. Parameters ---------- data : pandas.DataFrame or pandas.Series The DataFrame/Series with which to construct a Dask DataFrame/Series npartitions : int, optional The number of partitions of the index to create. Note that if there are duplicate values or insufficient elements in ``data.index``, the output may have fewer partitions than requested. chunksize : int, optional The desired number of rows per index partition to use. Note that depending on the size and index of the dataframe, actual partition sizes may vary. sort: bool Sort the input by index first to obtain cleanly divided partitions (with known divisions). If False, the input will not be sorted, and all divisions will be set to None. Default is True. name: string, optional An optional keyname for the dataframe. Defaults to hashing the input Returns ------- dask.DataFrame or dask.Series A dask DataFrame/Series partitioned along the index Examples -------- >>> from dask.dataframe import from_pandas >>> df = pd.DataFrame(dict(a=list('aabbcc'), b=list(range(6))), ... index=pd.date_range(start='20100101', periods=6)) >>> ddf = from_pandas(df, npartitions=3) >>> ddf.divisions # doctest: +NORMALIZE_WHITESPACE (Timestamp('2010-01-01 00:00:00', freq='D'), Timestamp('2010-01-03 00:00:00', freq='D'), Timestamp('2010-01-05 00:00:00', freq='D'), Timestamp('2010-01-06 00:00:00', freq='D')) >>> ddf = from_pandas(df.a, npartitions=3) # Works with Series too! >>> ddf.divisions # doctest: +NORMALIZE_WHITESPACE (Timestamp('2010-01-01 00:00:00', freq='D'), Timestamp('2010-01-03 00:00:00', freq='D'), Timestamp('2010-01-05 00:00:00', freq='D'), Timestamp('2010-01-06 00:00:00', freq='D')) Raises ------ TypeError If something other than a ``pandas.DataFrame`` or ``pandas.Series`` is passed in. See Also -------- from_array : Construct a dask.DataFrame from an array that has record dtype read_csv : Construct a dask.DataFrame from a CSV file """ if isinstance(getattr(data, "index", None), pd.MultiIndex): raise NotImplementedError("Dask does not support MultiIndex Dataframes.") if not has_parallel_type(data): raise TypeError("Input must be a pandas DataFrame or Series.") if (npartitions is None) == (chunksize is None): raise ValueError("Exactly one of npartitions and chunksize must be specified.") nrows = len(data) if chunksize is None: if not isinstance(npartitions, int): raise TypeError( "Please provide npartitions as an int, or possibly as None if you specify chunksize." ) elif not isinstance(chunksize, int): raise TypeError( "Please provide chunksize as an int, or possibly as None if you specify npartitions." ) name = name or ("from_pandas-" + tokenize(data, chunksize, npartitions)) if not nrows: return new_dd_object({(name, 0): data}, name, data, [None, None]) if data.index.isna().any() and not is_any_real_numeric_dtype(data.index): raise NotImplementedError( "Index in passed data is non-numeric and contains nulls, which Dask does not entirely support.\n" "Consider passing `data.loc[~data.isna()]` instead." ) if sort: if not data.index.is_monotonic_increasing: data = data.sort_index(ascending=True) else: # sort_index copies as well data = data.copy() divisions, locations = sorted_division_locations( data.index, npartitions=npartitions, chunksize=chunksize, ) else: data = data.copy() if chunksize is None: assert isinstance(npartitions, int) chunksize = int(ceil(nrows / npartitions)) locations = list(range(0, nrows, chunksize)) + [len(data)] divisions = [None] * len(locations) dsk = { (name, i): data.iloc[start:stop] for i, (start, stop) in enumerate(zip(locations[:-1], locations[1:])) } return new_dd_object(dsk, name, data, divisions)
@dataframe_creation_dispatch.register_inplace("pandas") def from_dict( data, npartitions, orient="columns", dtype=None, columns=None, constructor=pd.DataFrame, ): """ Construct a Dask DataFrame from a Python Dictionary Parameters ---------- data : dict Of the form {field : array-like} or {field : dict}. npartitions : int The number of partitions of the index to create. Note that depending on the size and index of the dataframe, the output may have fewer partitions than requested. orient : {'columns', 'index', 'tight'}, default 'columns' The "orientation" of the data. If the keys of the passed dict should be the columns of the resulting DataFrame, pass 'columns' (default). Otherwise if the keys should be rows, pass 'index'. If 'tight', assume a dict with keys ['index', 'columns', 'data', 'index_names', 'column_names']. dtype: bool Data type to force, otherwise infer. columns: string, optional Column labels to use when ``orient='index'``. Raises a ValueError if used with ``orient='columns'`` or ``orient='tight'``. constructor: class, default pd.DataFrame Class with which ``from_dict`` should be called with. Examples -------- >>> import dask.dataframe as dd >>> ddf = dd.from_dict({"num1": [1, 2, 3, 4], "num2": [7, 8, 9, 10]}, npartitions=2) """ collection_types = {type(v) for v in data.values() if is_dask_collection(v)} if collection_types: raise NotImplementedError( "from_dict doesn't currently support Dask collections as inputs. " f"Objects of type {collection_types} were given in the input dict." ) return from_pandas( constructor.from_dict(data, orient, dtype, columns), npartitions, ) def _partition_from_array(data, index=None, initializer=None, **kwargs): """Create a Dask partition for either a DataFrame or Series. Designed to be used with :func:`dask.blockwise.blockwise`. ``data`` is the array from which the partition will be created. ``index`` can be: 1. ``None``, in which case each partition has an independent RangeIndex 2. a `tuple` with two elements, the start and stop values for a RangeIndex for this partition, which gives a continuously varying RangeIndex over the whole Dask DataFrame 3. an instance of a ``pandas.Index`` or a subclass thereof The ``kwargs`` _must_ contain an ``initializer`` key which is set by calling ``type(meta)``. """ if isinstance(index, tuple): index = pd.RangeIndex(*index) return initializer(data, index=index, **kwargs)
[docs]def from_dask_array(x, columns=None, index=None, meta=None): """Create a Dask DataFrame from a Dask Array. Converts a 2d array into a DataFrame and a 1d array into a Series. Parameters ---------- x : da.Array columns : list or string list of column names if DataFrame, single string if Series index : dask.dataframe.Index, optional An optional *dask* Index to use for the output Series or DataFrame. The default output index depends on whether `x` has any unknown chunks. If there are any unknown chunks, the output has ``None`` for all the divisions (one per chunk). If all the chunks are known, a default index with known divisions is created. Specifying `index` can be useful if you're conforming a Dask Array to an existing dask Series or DataFrame, and you would like the indices to match. meta : object, optional An optional `meta` parameter can be passed for dask to specify the concrete dataframe type to be returned. By default, pandas DataFrame is used. Examples -------- >>> import dask.array as da >>> import dask.dataframe as dd >>> x = da.ones((4, 2), chunks=(2, 2)) >>> df = dd.io.from_dask_array(x, columns=['a', 'b']) >>> df.compute() a b 0 1.0 1.0 1 1.0 1.0 2 1.0 1.0 3 1.0 1.0 See Also -------- dask.bag.to_dataframe: from dask.bag dask.dataframe._Frame.values: Reverse conversion dask.dataframe._Frame.to_records: Reverse conversion """ meta = _meta_from_array(x, columns, index, meta=meta) name = "from-dask-array-" + tokenize(x, columns) graph_dependencies = [x] arrays_and_indices = [x.name, "ij" if x.ndim == 2 else "i"] numblocks = {x.name: x.numblocks} if index is not None: # An index is explicitly given by the caller, so we can pass it through to the # initializer after a few checks. if index.npartitions != x.numblocks[0]: msg = ( "The index and array have different numbers of blocks. " "({} != {})".format(index.npartitions, x.numblocks[0]) ) raise ValueError(msg) divisions = index.divisions graph_dependencies.append(index) arrays_and_indices.extend([index._name, "i"]) numblocks[index._name] = (index.npartitions,) elif np.isnan(sum(x.shape)): # The shape of the incoming array is not known in at least one dimension. As # such, we can't create an index for the entire output DataFrame and we set # the divisions to None to represent that. divisions = [None] * (len(x.chunks[0]) + 1) else: # The shape of the incoming array is known and we don't have an explicit index. # Create a mapping of chunk number in the incoming array to # (start row, stop row) tuples. These tuples will be used to create a sequential # RangeIndex later on that is continuous over the whole DataFrame. n_elements = sum(x.chunks[0]) divisions = [0] stop = 0 index_mapping = {} for i, increment in enumerate(x.chunks[0]): stop += increment index_mapping[(i,)] = (divisions[i], stop) # last division corrected, even if there are empty chunk(s) at the end if stop == n_elements: stop -= 1 divisions.append(stop) arrays_and_indices.extend([BlockwiseDepDict(mapping=index_mapping), "i"]) if is_series_like(meta): kwargs = {"dtype": x.dtype, "name": meta.name, "initializer": type(meta)} else: kwargs = {"columns": meta.columns, "initializer": type(meta)} blk = blockwise( _partition_from_array, name, "i", *arrays_and_indices, numblocks=numblocks, concatenate=True, # kwargs passed through to the DataFrame/Series initializer **kwargs, ) graph = HighLevelGraph.from_collections(name, blk, dependencies=graph_dependencies) return new_dd_object(graph, name, meta, divisions)
def _link(token, result): """A dummy function to link results together in a graph We use this to enforce an artificial sequential ordering on tasks that don't explicitly pass around a shared resource """ return None def _df_to_bag(df, index=False, format="tuple"): if isinstance(df, pd.DataFrame): if format == "tuple": return list(map(tuple, df.itertuples(index))) elif format == "dict": if index: return [ {**{"index": idx}, **values} for values, idx in zip(df.to_dict("records"), df.index) ] else: return df.to_dict(orient="records") elif isinstance(df, pd.Series): if format == "tuple": return list(df.items()) if index else list(df) elif format == "dict": return df.to_frame().to_dict(orient="records") def to_bag(df, index=False, format="tuple"): """Create Dask Bag from a Dask DataFrame Parameters ---------- index : bool, optional If True, the elements are tuples of ``(index, value)``, otherwise they're just the ``value``. Default is False. format : {"tuple", "dict", "frame"}, optional Whether to return a bag of tuples, dictionaries, or dataframe-like objects. Default is "tuple". If "frame", the original partitions of ``df`` will not be transformed in any way. Examples -------- >>> bag = df.to_bag() # doctest: +SKIP """ from dask.bag.core import Bag if not isinstance(df, (DataFrame, Series)): raise TypeError("df must be either DataFrame or Series") name = "to_bag-" + tokenize(df, index, format) if format == "frame": # Use existing graph and name of df, but # drop meta to produce a Bag collection dsk = df.dask name = df._name else: dsk = { (name, i): (_df_to_bag, block, index, format) for (i, block) in enumerate(df.__dask_keys__()) } dsk.update(df.__dask_optimize__(df.__dask_graph__(), df.__dask_keys__())) return Bag(dsk, name, df.npartitions)
[docs]def to_records(df): """Create Dask Array from a Dask Dataframe Warning: This creates a dask.array without precise shape information. Operations that depend on shape information, like slicing or reshaping, will not work. Examples -------- >>> df.to_records() # doctest: +SKIP See Also -------- dask.dataframe._Frame.values dask.dataframe.from_dask_array """ return df.map_partitions(M.to_records)
[docs]@insert_meta_param_description def from_delayed( dfs: Delayed | distributed.Future | Iterable[Delayed | distributed.Future], meta=None, divisions: tuple | Literal["sorted"] | None = None, prefix: str = "from-delayed", verify_meta: bool = True, ) -> DataFrame | Series: """Create Dask DataFrame from many Dask Delayed objects Parameters ---------- dfs : A ``dask.delayed.Delayed``, a ``distributed.Future``, or an iterable of either of these objects, e.g. returned by ``client.submit``. These comprise the individual partitions of the resulting dataframe. If a single object is provided (not an iterable), then the resulting dataframe will have only one partition. $META divisions : Partition boundaries along the index. For tuple, see https://docs.dask.org/en/latest/dataframe-design.html#partitions For string 'sorted' will compute the delayed values to find index values. Assumes that the indexes are mutually sorted. If None, then won't use index information prefix : Prefix to prepend to the keys. verify_meta : If True check that the partitions have consistent metadata, defaults to True. """ from dask.delayed import Delayed if isinstance(dfs, Delayed) or hasattr(dfs, "key"): dfs = [dfs] dfs = [ delayed(df) if not isinstance(df, Delayed) and hasattr(df, "key") else df for df in dfs ] for item in dfs: if not isinstance(item, Delayed): raise TypeError("Expected Delayed object, got %s" % type(item).__name__) if meta is None: meta = delayed(make_meta)(dfs[0]).compute() else: meta = make_meta(meta) if not dfs: dfs = [delayed(make_meta)(meta)] if divisions is None or divisions == "sorted": divs: list | tuple = [None] * (len(dfs) + 1) else: divs = list(divisions) if len(divs) != len(dfs) + 1: raise ValueError("divisions should be a tuple of len(dfs) + 1") name = prefix + "-" + tokenize(*dfs) layer = DataFrameIOLayer( name=name, columns=None, inputs=BlockwiseDepDict( {(i,): inp.key for i, inp in enumerate(dfs)}, produces_keys=True, ), io_func=partial(check_meta, meta=meta, funcname="from_delayed") if verify_meta else lambda x: x, ) df = new_dd_object( HighLevelGraph.from_collections(name, layer, dfs), name, meta, divs ) if divisions == "sorted": from dask.dataframe.shuffle import compute_and_set_divisions return compute_and_set_divisions(df) return df
def sorted_division_locations(seq, npartitions=None, chunksize=None): """Find division locations and values in sorted list Examples -------- >>> L = ['A', 'B', 'C', 'D', 'E', 'F'] >>> sorted_division_locations(L, chunksize=2) (['A', 'C', 'E', 'F'], [0, 2, 4, 6]) >>> sorted_division_locations(L, chunksize=3) (['A', 'D', 'F'], [0, 3, 6]) >>> L = ['A', 'A', 'A', 'A', 'B', 'B', 'B', 'C'] >>> sorted_division_locations(L, chunksize=3) (['A', 'B', 'C', 'C'], [0, 4, 7, 8]) >>> sorted_division_locations(L, chunksize=2) (['A', 'B', 'C', 'C'], [0, 4, 7, 8]) >>> sorted_division_locations(['A'], chunksize=2) (['A', 'A'], [0, 1]) """ if (npartitions is None) == (chunksize is None): raise ValueError("Exactly one of npartitions and chunksize must be specified.") # Find unique-offset array (if duplicates exist). # Note that np.unique(seq) should work in all cases # for newer versions of numpy/pandas seq_unique = seq.unique() if hasattr(seq, "unique") else np.unique(seq) duplicates = len(seq_unique) < len(seq) enforce_exact = False if duplicates: offsets = ( # Avoid numpy conversion (necessary for dask-cudf) seq.searchsorted(seq_unique, side="left") if hasattr(seq, "searchsorted") else np.array(seq).searchsorted(seq_unique, side="left") ) enforce_exact = npartitions and len(offsets) >= npartitions else: offsets = seq_unique = None # Define chunksize and residual so that # npartitions can be exactly satisfied # when duplicates is False residual = 0 subtract_drift = False if npartitions: chunksize = len(seq) // npartitions residual = len(seq) % npartitions subtract_drift = True def chunksizes(ind): # Helper function to satisfy npartitions return chunksize + int(ind < residual) # Always start with 0th item in seqarr, # and then try to take chunksize steps # along the seqarr array divisions = [seq[0]] locations = [0] i = chunksizes(0) ind = None # ind cache (sometimes avoids nonzero call) drift = 0 # accumulated drift away from ideal chunksizes divs_remain = npartitions - len(divisions) if enforce_exact else None while i < len(seq): # Map current position selection (i) # to the corresponding division value (div) div = seq[i] # pos is the position of the first occurrence of # div (which is i when seq has no duplicates) if duplicates: # Note: cupy requires casts to `int` below if ind is None: ind = int((seq_unique == seq[i]).nonzero()[0][0]) if enforce_exact: # Avoid "over-stepping" too many unique # values when npartitions is approximately # equal to len(offsets) offs_remain = len(offsets) - ind if divs_remain > offs_remain: ind -= divs_remain - offs_remain i = offsets[ind] div = seq[i] pos = int(offsets[ind]) else: pos = i if div <= divisions[-1]: # pos overlaps with divisions. # Try the next element on the following pass if duplicates: ind += 1 # Note: cupy requires cast to `int` i = int(offsets[ind]) if ind < len(offsets) else len(seq) else: i += 1 else: # pos does not overlap with divisions. # Append candidate pos/div combination, and # take another chunksize step if subtract_drift: # Only subtract drift when user specified npartitions drift = drift + ((pos - locations[-1]) - chunksizes(len(divisions) - 1)) if enforce_exact: divs_remain -= 1 i = pos + max(1, chunksizes(len(divisions)) - drift) divisions.append(div) locations.append(pos) ind = None # The final element of divisions/locations # will always be the same divisions.append(seq[-1]) locations.append(len(seq)) return divisions, locations class _PackedArgCallable(DataFrameIOFunction): """Packed-argument wrapper for DataFrameIOFunction This is a private helper class for ``from_map``. This class ensures that packed positional arguments will be expanded before the underlying function (``func``) is called. This class also handles optional metadata enforcement and column projection (when ``func`` satisfies the ``DataFrameIOFunction`` protocol). """ def __init__( self, func, args=None, kwargs=None, meta=None, packed=False, enforce_metadata=False, ): self.func = func self.args = args self.kwargs = kwargs self.meta = meta self.enforce_metadata = enforce_metadata self.packed = packed self.is_dataframe_io_func = isinstance(self.func, DataFrameIOFunction) @property def columns(self): if self.is_dataframe_io_func: return self.func.columns return None def project_columns(self, columns): if self.is_dataframe_io_func: return _PackedArgCallable( self.func.project_columns(columns), args=self.args, kwargs=self.kwargs, meta=self.meta[columns], packed=self.packed, enforce_metadata=self.enforce_metadata, ) return self def __call__(self, packed_arg): if not self.packed: packed_arg = [packed_arg] if self.enforce_metadata: return apply_and_enforce( *packed_arg, *(self.args or []), _func=self.func, _meta=self.meta, **(self.kwargs or {}), ) return self.func( *packed_arg, *(self.args or []), **(self.kwargs or {}), )
[docs]@insert_meta_param_description def from_map( func, *iterables, args=None, meta=None, divisions=None, label=None, token=None, enforce_metadata=True, **kwargs, ): """Create a DataFrame collection from a custom function map WARNING: The ``from_map`` API is experimental, and stability is not yet guaranteed. Use at your own risk! Parameters ---------- func : callable Function used to create each partition. If ``func`` satisfies the ``DataFrameIOFunction`` protocol, column projection will be enabled. *iterables : Iterable objects Iterable objects to map to each output partition. All iterables must be the same length. This length determines the number of partitions in the output collection (only one element of each iterable will be passed to ``func`` for each partition). args : list or tuple, optional Positional arguments to broadcast to each output partition. Note that these arguments will always be passed to ``func`` after the ``iterables`` positional arguments. $META divisions : tuple, str, optional Partition boundaries along the index. For tuple, see https://docs.dask.org/en/latest/dataframe-design.html#partitions For string 'sorted' will compute the delayed values to find index values. Assumes that the indexes are mutually sorted. If None, then won't use index information label : str, optional String to use as the function-name label in the output collection-key names. token : str, optional String to use as the "token" in the output collection-key names. enforce_metadata : bool, default True Whether to enforce at runtime that the structure of the DataFrame produced by ``func`` actually matches the structure of ``meta``. This will rename and reorder columns for each partition, and will raise an error if this doesn't work, but it won't raise if dtypes don't match. **kwargs: Key-word arguments to broadcast to each output partition. These same arguments will be passed to ``func`` for every output partition. Examples -------- >>> import pandas as pd >>> import dask.dataframe as dd >>> func = lambda x, size=0: pd.Series([x] * size) >>> inputs = ["A", "B"] >>> dd.from_map(func, inputs, size=2).compute() 0 A 1 A 0 B 1 B dtype: object This API can also be used as an alternative to other file-based IO functions, like ``read_parquet`` (which are already just ``from_map`` wrapper functions): >>> import pandas as pd >>> import dask.dataframe as dd >>> paths = ["0.parquet", "1.parquet", "2.parquet"] >>> dd.from_map(pd.read_parquet, paths).head() # doctest: +SKIP name timestamp 2000-01-01 00:00:00 Laura 2000-01-01 00:00:01 Oliver 2000-01-01 00:00:02 Alice 2000-01-01 00:00:03 Victor 2000-01-01 00:00:04 Bob Since ``from_map`` allows you to map an arbitrary function to any number of iterable objects, it can be a very convenient means of implementing functionality that may be missing from from other DataFrame-creation methods. For example, if you happen to have apriori knowledge about the number of rows in each of the files in a dataset, you can generate a DataFrame collection with a global RangeIndex: >>> import pandas as pd >>> import numpy as np >>> import dask.dataframe as dd >>> paths = ["0.parquet", "1.parquet", "2.parquet"] >>> file_sizes = [86400, 86400, 86400] >>> def func(path, row_offset): ... # Read parquet file and set RangeIndex offset ... df = pd.read_parquet(path) ... return df.set_index( ... pd.RangeIndex(row_offset, row_offset+len(df)) ... ) >>> def get_ddf(paths, file_sizes): ... offsets = [0] + list(np.cumsum(file_sizes)) ... return dd.from_map( ... func, paths, offsets[:-1], divisions=offsets ... ) >>> ddf = get_ddf(paths, file_sizes) # doctest: +SKIP >>> ddf.index # doctest: +SKIP Dask Index Structure: npartitions=3 0 int64 86400 ... 172800 ... 259200 ... dtype: int64 Dask Name: myfunc, 6 tasks See Also -------- dask.dataframe.from_delayed dask.layers.DataFrameIOLayer """ # Input validation if not callable(func): raise ValueError("`func` argument must be `callable`") lengths = set() iterables = list(iterables) for i, iterable in enumerate(iterables): if not isinstance(iterable, Iterable): raise ValueError( f"All elements of `iterables` must be Iterable, got {type(iterable)}" ) try: lengths.add(len(iterable)) except (AttributeError, TypeError): iterables[i] = list(iterable) lengths.add(len(iterables[i])) if len(lengths) == 0: raise ValueError("`from_map` requires at least one Iterable input") elif len(lengths) > 1: raise ValueError("All `iterables` must have the same length") if lengths == {0}: raise ValueError("All `iterables` must have a non-zero length") # Check for `produces_tasks` and `creation_info`. # These options are included in the function signature, # because they are not intended for "public" use. produces_tasks = kwargs.pop("produces_tasks", False) creation_info = kwargs.pop("creation_info", None) if produces_tasks or len(iterables) == 1: if len(iterables) > 1: # Tasks are not detected correctly when they are "packed" # within an outer list/tuple raise ValueError( "Multiple iterables not supported when produces_tasks=True" ) inputs = iterables[0] packed = False else: inputs = list(zip(*iterables)) packed = True # Define collection name label = label or funcname(func) token = token or tokenize( func, meta, inputs, args, divisions, enforce_metadata, **kwargs ) name = f"{label}-{token}" # Get "projectable" column selection. # Note that this relies on the IO function # ducktyping with DataFrameIOFunction column_projection = func.columns if isinstance(func, DataFrameIOFunction) else None # NOTE: Most of the metadata-handling logic used here # is copied directly from `map_partitions` if meta is None: meta = _emulate( func, *(inputs[0] if packed else inputs[:1]), *(args or []), udf=True, **kwargs, ) meta_is_emulated = True else: meta = make_meta(meta) meta_is_emulated = False if not (has_parallel_type(meta) or is_arraylike(meta) and meta.shape): if not meta_is_emulated: raise TypeError( "Meta is not valid, `from_map` expects output to be a pandas object. " "Try passing a pandas object as meta or a dict or tuple representing the " "(name, dtype) of the columns." ) # If `meta` is not a pandas object, the concatenated results will be a # different type meta = make_meta(_concat([meta])) # Ensure meta is empty DataFrame meta = make_meta(meta) # Define io_func if packed or args or kwargs or enforce_metadata: io_func = _PackedArgCallable( func, args=args, kwargs=kwargs, meta=meta if enforce_metadata else None, enforce_metadata=enforce_metadata, packed=packed, ) else: io_func = func # Construct DataFrameIOLayer layer = DataFrameIOLayer( name, column_projection, inputs, io_func, label=label, produces_tasks=produces_tasks, creation_info=creation_info, ) # Return new DataFrame-collection object divisions = divisions or [None] * (len(inputs) + 1) graph = HighLevelGraph.from_collections(name, layer, dependencies=[]) return new_dd_object(graph, name, meta, divisions)
def to_backend(ddf: _Frame, backend: str | None = None, **kwargs): """Move a DataFrame collection to a new backend Parameters ---------- ddf : DataFrame, Series, or Index The input dataframe collection. backend : str, Optional The name of the new backend to move to. The default is the current "dataframe.backend" configuration. Returns ------- dask.DataFrame, dask.Series or dask.Index A new dataframe collection with the backend specified by ``backend``. """ # Get desired backend backend = backend or dataframe_creation_dispatch.backend # Check that "backend" has a registered entrypoint backend_entrypoint = dataframe_creation_dispatch.dispatch(backend) # Call `DataFrameBackendEntrypoint.to_backend` return backend_entrypoint.to_backend(ddf, **kwargs) DataFrame.to_records.__doc__ = to_records.__doc__ DataFrame.to_bag.__doc__ = to_bag.__doc__