Source code for dask.dataframe.io.hdf

from __future__ import annotations

import os
import uuid
from fnmatch import fnmatch
from glob import glob
from warnings import warn

import pandas as pd
from fsspec.utils import build_name_function, stringify_path
from tlz import merge

from dask import config
from dask.base import (
    compute_as_if_collection,
    get_scheduler,
    named_schedulers,
    tokenize,
)
from dask.dataframe.backends import dataframe_creation_dispatch
from dask.dataframe.core import DataFrame, Scalar
from dask.dataframe.io.io import _link, from_map
from dask.dataframe.io.utils import DataFrameIOFunction, SupportsLock
from dask.highlevelgraph import HighLevelGraph
from dask.utils import get_scheduler_lock

MP_GET = named_schedulers.get("processes", object())


def _pd_to_hdf(pd_to_hdf, lock, args, kwargs=None):
    """A wrapper function around pd_to_hdf that enables locking"""

    if lock:
        lock.acquire()
    try:
        pd_to_hdf(*args, **kwargs)
    finally:
        if lock:
            lock.release()

    return None


[docs]def to_hdf( df, path, key, mode="a", append=False, scheduler=None, name_function=None, compute=True, lock=None, dask_kwargs=None, **kwargs, ): """Store Dask Dataframe to Hierarchical Data Format (HDF) files This is a parallel version of the Pandas function of the same name. Please see the Pandas docstring for more detailed information about shared keyword arguments. This function differs from the Pandas version by saving the many partitions of a Dask DataFrame in parallel, either to many files, or to many datasets within the same file. You may specify this parallelism with an asterix ``*`` within the filename or datapath, and an optional ``name_function``. The asterix will be replaced with an increasing sequence of integers starting from ``0`` or with the result of calling ``name_function`` on each of those integers. This function only supports the Pandas ``'table'`` format, not the more specialized ``'fixed'`` format. Parameters ---------- path : string, pathlib.Path Path to a target filename. Supports strings, ``pathlib.Path``, or any object implementing the ``__fspath__`` protocol. May contain a ``*`` to denote many filenames. key : string Datapath within the files. May contain a ``*`` to denote many locations name_function : function A function to convert the ``*`` in the above options to a string. Should take in a number from 0 to the number of partitions and return a string. (see examples below) compute : bool Whether or not to execute immediately. If False then this returns a ``dask.Delayed`` value. lock : bool, Lock, optional Lock to use to prevent concurrency issues. By default a ``threading.Lock``, ``multiprocessing.Lock`` or ``SerializableLock`` will be used depending on your scheduler if a lock is required. See dask.utils.get_scheduler_lock for more information about lock selection. scheduler : string The scheduler to use, like "threads" or "processes" **other: See pandas.to_hdf for more information Examples -------- Save Data to a single file >>> df.to_hdf('output.hdf', '/data') # doctest: +SKIP Save data to multiple datapaths within the same file: >>> df.to_hdf('output.hdf', '/data-*') # doctest: +SKIP Save data to multiple files: >>> df.to_hdf('output-*.hdf', '/data') # doctest: +SKIP Save data to multiple files, using the multiprocessing scheduler: >>> df.to_hdf('output-*.hdf', '/data', scheduler='processes') # doctest: +SKIP Specify custom naming scheme. This writes files as '2000-01-01.hdf', '2000-01-02.hdf', '2000-01-03.hdf', etc.. >>> from datetime import date, timedelta >>> base = date(year=2000, month=1, day=1) >>> def name_function(i): ... ''' Convert integer 0 to n to a string ''' ... return base + timedelta(days=i) >>> df.to_hdf('*.hdf', '/data', name_function=name_function) # doctest: +SKIP Returns ------- filenames : list Returned if ``compute`` is True. List of file names that each partition is saved to. delayed : dask.Delayed Returned if ``compute`` is False. Delayed object to execute ``to_hdf`` when computed. See Also -------- read_hdf: to_parquet: """ if dask_kwargs is None: dask_kwargs = {} name = "to-hdf-" + uuid.uuid1().hex pd_to_hdf = df._partition_type.to_hdf single_file = True single_node = True path = stringify_path(path) # if path is string, format using i_name if isinstance(path, str): if path.count("*") + key.count("*") > 1: raise ValueError( "A maximum of one asterisk is accepted in file path and dataset key" ) fmt_obj = lambda path, i_name: path.replace("*", i_name) if "*" in path: single_file = False else: if key.count("*") > 1: raise ValueError("A maximum of one asterisk is accepted in dataset key") fmt_obj = lambda path, _: path if "*" in key: single_node = False if "format" in kwargs and kwargs["format"] not in ["t", "table"]: raise ValueError("Dask only support 'table' format in hdf files.") if mode not in ("a", "w", "r+"): raise ValueError("Mode must be one of 'a', 'w' or 'r+'") if name_function is None: name_function = build_name_function(df.npartitions - 1) # we guarantee partition order is preserved when its saved and read # so we enforce name_function to maintain the order of its input. if not (single_file and single_node): formatted_names = [name_function(i) for i in range(df.npartitions)] if formatted_names != sorted(formatted_names): warn( "To preserve order between partitions name_function " "must preserve the order of its input" ) # If user did not specify scheduler and write is sequential default to the # sequential scheduler. otherwise let the _get method choose the scheduler try: from distributed import default_client default_client() client_available = True except (ImportError, ValueError): client_available = False if ( scheduler is None and not config.get("scheduler", None) and not client_available and single_node and single_file ): scheduler = "single-threaded" # handle lock default based on whether we're writing to a single entity _actual_get = get_scheduler(collections=[df], scheduler=scheduler) if lock is None: if not single_node: lock = True elif not single_file and _actual_get is not MP_GET: # if we're writing to multiple files with the multiprocessing # scheduler we don't need to lock lock = True else: lock = False # TODO: validation logic to ensure that provided locks are compatible with the scheduler if isinstance(lock, bool) and lock: lock = get_scheduler_lock(df, scheduler=scheduler) elif lock: assert isinstance(lock, SupportsLock) dsk = dict() i_name = name_function(0) kwargs.update( { "format": "table", "mode": mode, "append": append, "key": key.replace("*", i_name), } ) dsk[(name, 0)] = ( _pd_to_hdf, pd_to_hdf, lock, [(df._name, 0), fmt_obj(path, i_name)], kwargs, ) kwargs2 = kwargs.copy() if single_file: kwargs2["mode"] = "a" if single_node: kwargs2["append"] = True filenames = [] for i in range(0, df.npartitions): i_name = name_function(i) filenames.append(fmt_obj(path, i_name)) for i in range(1, df.npartitions): i_name = name_function(i) kwargs2["key"] = key.replace("*", i_name) task = ( _pd_to_hdf, pd_to_hdf, lock, [(df._name, i), fmt_obj(path, i_name)], kwargs2.copy(), ) if single_file: link_dep = i - 1 if single_node else 0 task = (_link, (name, link_dep), task) dsk[(name, i)] = task if single_file and single_node: keys = [(name, df.npartitions - 1)] else: keys = [(name, i) for i in range(df.npartitions)] final_name = name + "-final" dsk[(final_name, 0)] = (lambda x: None, keys) graph = HighLevelGraph.from_collections((name, 0), dsk, dependencies=[df]) if compute: compute_as_if_collection( DataFrame, graph, keys, scheduler=scheduler, **dask_kwargs ) return filenames else: return Scalar(graph, final_name, "")
dont_use_fixed_error_message = """ This HDFStore is not partitionable and can only be use monolithically with pandas. In the future when creating HDFStores use the ``format='table'`` option to ensure that your dataset can be parallelized""" read_hdf_error_msg = """ The start and stop keywords are not supported when reading from more than one file/dataset. The combination is ambiguous because it could be interpreted as the starting and stopping index per file, or starting and stopping index of the global dataset.""" class HDFFunctionWrapper(DataFrameIOFunction): """ HDF5 Function-Wrapper Class Reads HDF5 data from disk to produce a partition (given a key). """ def __init__(self, columns, dim, lock, common_kwargs): self._columns = columns self.lock = lock self.common_kwargs = common_kwargs self.dim = dim if columns and dim > 1: self.common_kwargs = merge(common_kwargs, {"columns": columns}) @property def columns(self): return self._columns def project_columns(self, columns): """Return a new HDFFunctionWrapper object with a sub-column projection. """ if columns == self.columns: return self return HDFFunctionWrapper(columns, self.dim, self.lock, self.common_kwargs) def __call__(self, part): """Read from hdf5 file with a lock""" path, key, kwargs = part if self.lock: self.lock.acquire() try: result = pd.read_hdf(path, key, **merge(self.common_kwargs, kwargs)) finally: if self.lock: self.lock.release() return result
[docs]@dataframe_creation_dispatch.register_inplace("pandas") def read_hdf( pattern, key, start=0, stop=None, columns=None, chunksize=1000000, sorted_index=False, lock=True, mode="r", ): """ Read HDF files into a Dask DataFrame Read hdf files into a dask dataframe. This function is like ``pandas.read_hdf``, except it can read from a single large file, or from multiple files, or from multiple keys from the same file. Parameters ---------- pattern : string, pathlib.Path, list File pattern (string), pathlib.Path, buffer to read from, or list of file paths. Can contain wildcards. key : group identifier in the store. Can contain wildcards start : optional, integer (defaults to 0), row number to start at stop : optional, integer (defaults to None, the last row), row number to stop at columns : list of columns, optional A list of columns that if not None, will limit the return columns (default is None) chunksize : positive integer, optional Maximal number of rows per partition (default is 1000000). sorted_index : boolean, optional Option to specify whether or not the input hdf files have a sorted index (default is False). lock : boolean, optional Option to use a lock to prevent concurrency issues (default is True). mode : {'a', 'r', 'r+'}, default 'r'. Mode to use when opening file(s). 'r' Read-only; no data can be modified. 'a' Append; an existing file is opened for reading and writing, and if the file does not exist it is created. 'r+' It is similar to 'a', but the file must already exist. Returns ------- dask.DataFrame Examples -------- Load single file >>> dd.read_hdf('myfile.1.hdf5', '/x') # doctest: +SKIP Load multiple files >>> dd.read_hdf('myfile.*.hdf5', '/x') # doctest: +SKIP >>> dd.read_hdf(['myfile.1.hdf5', 'myfile.2.hdf5'], '/x') # doctest: +SKIP Load multiple datasets >>> dd.read_hdf('myfile.1.hdf5', '/*') # doctest: +SKIP """ if lock is True: lock = get_scheduler_lock() key = key if key.startswith("/") else "/" + key # Convert path-like objects to a string pattern = stringify_path(pattern) if isinstance(pattern, str): paths = sorted(glob(pattern)) else: paths = pattern if not isinstance(pattern, str) and len(paths) == 0: raise ValueError("No files provided") if not paths or len(paths) == 0: raise OSError(f"File(s) not found: {pattern}") for path in paths: try: exists = os.path.exists(path) except (ValueError, TypeError): exists = False if not exists: raise OSError(f"File not found or insufficient permissions: {path}") if (start != 0 or stop is not None) and len(paths) > 1: raise NotImplementedError(read_hdf_error_msg) if chunksize <= 0: raise ValueError("Chunksize must be a positive integer") if (start != 0 or stop is not None) and sorted_index: raise ValueError( "When assuming pre-partitioned data, data must be " "read in its entirety using the same chunksizes" ) # Build metadata with pd.HDFStore(paths[0], mode=mode) as hdf: meta_key = _expand_key(key, hdf)[0] try: meta = pd.read_hdf(hdf, meta_key, stop=0) except IndexError: # if file is empty, don't set stop meta = pd.read_hdf(hdf, meta_key) if columns is not None: meta = meta[columns] # Common kwargs if meta.ndim == 1: common_kwargs = {"name": meta.name, "mode": mode} else: common_kwargs = {"mode": mode} # Build parts parts, divisions = _build_parts( paths, key, start, stop, chunksize, sorted_index, mode ) # Construct the output collection with from_map return from_map( HDFFunctionWrapper(columns, meta.ndim, lock, common_kwargs), parts, meta=meta, divisions=divisions, label="read-hdf", token=tokenize(paths, key, start, stop, sorted_index, chunksize, mode), enforce_metadata=False, )
def _build_parts(paths, key, start, stop, chunksize, sorted_index, mode): """ Build the list of partition inputs and divisions for read_hdf """ parts = [] global_divisions = [] for path in paths: keys, stops, divisions = _get_keys_stops_divisions( path, key, stop, sorted_index, chunksize, mode ) for k, s, d in zip(keys, stops, divisions): if d and global_divisions: global_divisions = global_divisions[:-1] + d elif d: global_divisions = d parts.extend(_one_path_one_key(path, k, start, s, chunksize)) return parts, global_divisions or [None] * (len(parts) + 1) def _one_path_one_key(path, key, start, stop, chunksize): """ Get the DataFrame corresponding to one path and one key (which should not contain any wildcards). """ if start >= stop: raise ValueError( "Start row number ({}) is above or equal to stop " "row number ({})".format(start, stop) ) return [ (path, key, {"start": s, "stop": s + chunksize}) for i, s in enumerate(range(start, stop, chunksize)) ] def _expand_key(key, hdf): import glob if not glob.has_magic(key): keys = [key] else: keys = [k for k in hdf.keys() if fnmatch(k, key)] # https://github.com/dask/dask/issues/5934 # TODO: remove this part if/when pandas copes with all keys keys.extend( n._v_pathname for n in hdf._handle.walk_nodes("/", classname="Table") if fnmatch(n._v_pathname, key) and n._v_name != "table" and n._v_pathname not in keys ) return keys def _get_keys_stops_divisions(path, key, stop, sorted_index, chunksize, mode): """ Get the "keys" or group identifiers which match the given key, which can contain wildcards (see _expand_path). This uses the hdf file identified by the given path. Also get the index of the last row of data for each matched key. """ with pd.HDFStore(path, mode=mode) as hdf: stops = [] divisions = [] keys = _expand_key(key, hdf) for k in keys: storer = hdf.get_storer(k) if storer.format_type != "table": raise TypeError(dont_use_fixed_error_message) if stop is None: stops.append(storer.nrows) elif stop > storer.nrows: raise ValueError( "Stop keyword exceeds dataset number " "of rows ({})".format(storer.nrows) ) else: stops.append(stop) if sorted_index: division = [ storer.read_column("index", start=start, stop=start + 1)[0] for start in range(0, storer.nrows, chunksize) ] division_end = storer.read_column( "index", start=storer.nrows - 1, stop=storer.nrows )[0] division.append(division_end) divisions.append(division) else: divisions.append(None) return keys, stops, divisions from dask.dataframe.core import _Frame _Frame.to_hdf.__doc__ = to_hdf.__doc__