Source code for meerkat.ops.map

import warnings
from inspect import signature
from typing import TYPE_CHECKING, Callable, Dict, Mapping, Sequence, Tuple, Type, Union

import meerkat.tools.docs as docs
from meerkat.block.abstract import BlockView

if TYPE_CHECKING:
    from meerkat.columns.abstract import Column
    from meerkat.columns.deferred.base import DeferredColumn
    from meerkat.dataframe import DataFrame


_SHARED_DOCS_ = {
    "input_description": docs.DescriptionSection(
        """
    *What gets passed to function?*

    *   If ${data} is a :class:`DataFrame` and ``outputs`` is not passed, then the \
        function's signature is inspected to determine which columns to pass as \
        keyword arguments to the function.
        For example, if the function is
        ``lambda age, residence: age > 18 and residence == "NY"``, then
        the columns ``age`` and ``residence`` will be passed to the function. If the
        columns are not present in the DataFrame, then a `ValueError` will be raised.
    *   If ${data} is a :class:`DataFrame` and ``outputs`` is  ``"single"``, then \
        mapping between columns and function arguments can be overridden by passing
        a the ``inputs`` argument.
    *   If ${data} is a :class:`Column` then values of the
        column are passed as a single positional argument to the function. The
        ``inputs`` argument is ignored.
    """
    ),
    "function": docs.Arg(
        """
        function (Callable): The function that will be applied to the rows of
            ``${data}``.
        """
    ),
    "is_batched_fn": docs.Arg(
        """
        is_batched_fn (bool, optional): Whether the function must be applied on a
            batch of rows. Defaults to False.
        """
    ),
    "batch_size": docs.Arg(
        """
        batch_size (int, optional): The size of the batch. Defaults to 1.
        """
    ),
    "inputs": docs.Arg(
        """
        inputs (Dict[str, str], optional): Dictionary mapping column names in
            ``${data}`` to keyword arguments of ``function``. Ignored if ``${data}`` is
            a column. When calling ``function`` values from the columns will be fed to
            the corresponding keyword arguments. Defaults to None, in which case it
            inspects the signature of the function. It then finds the columns with the
            same names in the DataFrame and passes the corresponding values to the
            function. If the function takes a non-default argument that is not a
            column in the DataFrame, the operation will raise a `ValueError`.
        """
    ),
    "outputs": docs.Arg(
        """
        outputs (Union[Dict[any, str], Tuple[str]], optional): Controls how the output
            of ``function`` is mapped to the output of :func:`${name}`.
            Defaults to ``None``.

            *   If ``None``: the output is inferred from the return type of the
                function. See explanation above.
            *   If ``"single"``: a single :class:`DeferredColumn` is returned.
            *   If a ``Dict[any, str]``: then a :class:`DataFrame` containing
                DeferredColumns is returned. This is useful when the output of
                ``function`` is a ``Dict``. ``outputs`` maps the outputs of ``function``
                to column names in the resulting :class:`DataFrame`.
            *   If a ``Tuple[str]``: then a :class:`DataFrame` containing
                output :class:`DeferredColumn` is returned. This is useful when the
                of ``function`` is a ``Tuple``. ``outputs`` maps the outputs of
                ``function`` to column names in the resulting :class:`DataFrame`.
        """
    ),
    "output_type": docs.Arg(
        """
        output_type (Union[Dict[str, type], type], optional): Coerce the column.
            Defaults to None.
        """
    ),
    "materialize": docs.Arg(
        """
        materialize (bool, optional): Whether to materialize the input column(s).
            Defaults to True.
        """
    ),
    "use_ray": docs.Arg(
        """
        use_ray (bool): Use Ray to parallelize the computation. Defaults to False.
        """
    ),
    "num_blocks": docs.Arg(
        """
        num_blocks (int): When using Ray, the number of blocks to split the data into.
            Defaults to 100.
        """
    ),
    "blocks_per_window": docs.Arg(
        """
        blocks_per_window (int): When using Ray, the number of blocks to process in a
            single Ray task. Defaults to 10.
        """
    ),
    "pbar": docs.Arg(
        """
        pbar (bool): Show a progress bar. Defaults to False.
        """
    ),
}


[docs]@docs.doc(source=_SHARED_DOCS_, data="data", name="defer") def defer( data: Union["DataFrame", "Column"], function: Callable, is_batched_fn: bool = False, batch_size: int = 1, inputs: Union[Mapping[str, str], Sequence[str]] = None, outputs: Union[Mapping[any, str], Sequence[str]] = None, output_type: Union[Mapping[str, Type["Column"]], Type["Column"]] = None, materialize: bool = True, ) -> Union["DataFrame", "DeferredColumn"]: """Create one or more DeferredColumns that lazily applies a function to each row in ${data}. This function shares nearly the exact same signature with :func:`map`, the difference is that :func:`~meerkat.defer` returns a column that has not yet been computed. It is a placeholder for a column that will be computed later. Learn more in the user guide: :ref:`guide/dataframe/ops/mapping/deferred`. {input_description} *What gets returned by defer?* * If ``function`` returns a single value, then ``defer`` will return a :class:`DeferredColumn` object. * If ``function`` returns a dictionary, then ``defer`` will return a :class:`DataFrame` containing :class:`DeferredColumn` objects. The keys of the dictionary are used as column names. The ``outputs`` argument can be used to override the column names. * If ``function`` returns a tuple, then ``defer`` will return a :class:`DataFrame` containing :class:`DeferredColumn` objects. The column names will be integers. The column names can be overriden by passing a tuple to the ``outputs`` argument. * If ``function`` returns a tuple or a dictionary, then passing ``"single"`` to the ``outputs`` argument will cause ``defer`` to return a single :class:`DeferredColumn` that materializes to a :class:`ObjectColumn`. *How do you execute the deferred map?* Depending on ``function`` and the ``outputs`` argument, returns either a :class:`DeferredColumn` or a :class:`DataFrame`. Both are **callables**. To execute the deferred map, simply call the returned object. .. note:: This function is also available as a method of :class:`DataFrame` and :class:`Column` under the name ``defer``. Args: ${data} (DataFrame): The :class:`DataFrame` or :class:`Column` to which the function will be applied. ${function} ${is_batched_fn} ${batch_size} ${inputs} ${outputs} ${output_type} ${materialize} Returns: Union[DataFrame, DeferredColumn]: A :class:`DeferredColumn` or a :class:`DataFrame` containing :class:`DeferredColumn` representing the deferred map. Examples --------- We start with a small DataFrame of voters with two columns: `birth_year`, which contains the birth year of each person, and `residence`, which contains the state in which each person lives. .. ipython:: python import datetime import meerkat as mk df = mk.DataFrame({ "birth_year": [1967, 1993, 2010, 1985, 2007, 1990, 1943], "residence": ["MA", "LA", "NY", "NY", "MA", "MA", "LA"] }) **Single input column.** Lazily create a column of birth years to a column of ages. .. ipython:: python df["age"] = df["birth_year"].defer( lambda x: datetime.datetime.now().year - x ) df["age"] We can materialize the deferred map (*i.e.* run it) by calling the column. .. ipython:: python df["age"]() **Multiple input columns.** Lazily create a column of birth years to a column of ages. .. ipython:: python df["ma_eligible"] = df.defer( lambda age, residence: (residence == "MA") and (age >= 18) ) df["ma_eligible"]() """ from meerkat import DeferredColumn from meerkat.block.deferred_block import DeferredBlock, DeferredOp from meerkat.columns.abstract import Column, infer_column_type from meerkat.dataframe import DataFrame base_function = function # prepare arguments for LambdaOp if isinstance(data, Column): args = [data] kwargs = {} elif isinstance(data, DataFrame): args, kwargs = None, None if inputs == "row": pass elif isinstance(inputs, Mapping): args = [] kwargs = {kw: data[col_name] for col_name, kw in inputs.items()} elif isinstance(inputs, Sequence): # TODO: make this work with a list args = [data[col_name] for col_name in inputs] kwargs = {} elif inputs is None: # infer mapping from function signature if possible otherwise pass full row args = [] kwargs = {} for name, param in signature(function).parameters.items(): if name in data: kwargs[name] = data[name] elif param.default is param.empty: warnings.warn( f"Non-default argument '{name}' does not have a corresponding " "column in the DataFrame. If your function expects a full " "DataFrame row, pass ``inputs='row'`` to ``map``. Otherwise, " "please provide an `inputs` mapping " "or pass a lambda function with a different signature. " "See map documentation for more details.", ) inputs = "row" break if inputs == "row": args = [] kwargs = {col_name: col for col_name, col in data.items()} def wrapper(*args, **kwargs): # FIXME: this should use data._clone instead! if is_batched_fn: kwargs = DataFrame(kwargs) return base_function(kwargs) function = wrapper if args is None or kwargs is None: raise ValueError("``inputs`` must be Mapping, Sequence or 'row'") op = DeferredOp( fn=function, args=args, kwargs=kwargs, is_batched_fn=is_batched_fn, batch_size=batch_size, return_format=type(outputs) if outputs is not None else None, materialize_inputs=materialize, ) block = DeferredBlock.from_block_data(data=op) first_row = op._get(0) if len(op) > 0 else None if outputs is None and isinstance(first_row, Dict): # support for splitting a dict into multiple columns without specifying outputs outputs = {output_key: output_key for output_key in first_row} op.return_format = type(outputs) if outputs is None and isinstance(first_row, Tuple): # support for splitting a tuple into multiple columns without specifying outputs outputs = tuple([str(i) for i in range(len(first_row))]) op.return_format = type(outputs) if outputs is None or outputs == "single": # can only infer output type if the the input columns are nonempty if output_type is None and first_row is not None: output_type = infer_column_type([first_row]) if not isinstance(output_type, Type): raise ValueError( "Must provide a single `output_type` if `outputs` is None." ) col = DeferredColumn( data=BlockView(block_index=None, block=block), output_type=output_type ) if isinstance(data, Column): col.formatters = data.formatters.defer() return col elif isinstance(outputs, Mapping): if output_type is None: output_type = { outputs[output_key]: infer_column_type([col]) for output_key, col in first_row.items() } if not isinstance(output_type, Mapping): raise ValueError( "Must provide a `output_type` mapping if `outputs` is a mapping." ) return DataFrame( { col: DeferredColumn( data=BlockView(block_index=output_key, block=block), output_type=output_type[outputs[output_key]], ) for output_key, col in outputs.items() } ) elif isinstance(outputs, Sequence): if output_type is None: output_type = [type(col) for col in first_row] if not isinstance(output_type, Sequence): raise ValueError( "Must provide a `output_type` sequence if `outputs` is a sequence." ) return DataFrame( { col: DeferredColumn( data=BlockView(block_index=output_key, block=block), output_type=output_type[output_key], ) for output_key, col in enumerate(outputs) } )
[docs]@docs.doc(source=_SHARED_DOCS_, data="data", name="defer") def map( data: Union["DataFrame", "Column"], function: Callable, is_batched_fn: bool = False, batch_size: int = 1, inputs: Union[Mapping[str, str], Sequence[str]] = None, outputs: Union[Mapping[any, str], Sequence[str]] = None, output_type: Union[Mapping[str, Type["Column"]], Type["Column"]] = None, materialize: bool = True, use_ray: bool = False, num_blocks: int = 100, blocks_per_window: int = 10, pbar: bool = False, **kwargs, ): """Create a new :class:`Column` or :class:`DataFrame` by applying a function to each row in ${data}. This function shares nearly the exact same signature with :func:`defer`, the difference is that :func:`~meerkat.defer` returns a column that has not yet been computed. It is a placeholder for a column that will be computed later. Learn more in the user guide: :ref:`guide/dataframe/ops/mapping`. {input_description} *What gets returned by defer?* * If ``function`` returns a single value, then ``defer`` will return a :class:`DeferredColumn` object. * If ``function`` returns a dictionary, then ``defer`` will return a :class:`DataFrame` containing :class:`DeferredColumn` objects. The keys of the dictionary are used as column names. The ``outputs`` argument can be used to override the column names. * If ``function`` returns a tuple, then ``defer`` will return a :class:`DataFrame` containing :class:`DeferredColumn` objects. The column names will be integers. The column names can be overriden by passing a tuple to the ``outputs`` argument. * If ``function`` returns a tuple or a dictionary, then passing ``"single"`` to the ``outputs`` argument will cause ``defer`` to return a single :class:`DeferredColumn` that materializes to a :class:`ObjectColumn` .. note:: This function is also available as a method of :class:`DataFrame` and :class:`Column` under the name ``map``. Args: ${data} (DataFrame): The :class:`DataFrame` or :class:`Column` to which the function will be applied. ${function} ${is_batched_fn} ${batch_size} ${inputs} ${outputs} ${output_type} ${materialize} use_ray (bool): Use Ray to parallelize the computation. Defaults to False. num_blocks (int): When using Ray, the number of blocks to split the data into. Defaults to 100. blocks_per_window (int): When using Ray, the number of blocks to process in a single Ray task. Defaults to 10. pbar (bool): Show a progress bar. Defaults to False. Returns: Union[DataFrame, Column]: A :class:`Column` or a :class:`DataFrame`. Examples --------- We start with a small DataFrame of voters with two columns: `birth_year`, which contains the birth year of each person, and `residence`, which contains the state in which each person lives. .. ipython:: python import datetime import meerkat as mk df = mk.DataFrame({ "birth_year": [1967, 1993, 2010, 1985, 2007, 1990, 1943], "residence": ["MA", "LA", "NY", "NY", "MA", "MA", "LA"] }) **Single input column.** Lazily create a column of birth years to a column of ages. .. ipython:: python df["age"] = df["birth_year"].map( lambda x: datetime.datetime.now().year - x ) df["age"] **Multiple input columns.** Lazily create a column of birth years to a column of ages. .. ipython:: python df["ma_eligible"] = df.map( lambda age, residence: (residence == "MA") and (age >= 18) ) df["ma_eligible"] """ deferred = defer( data=data, function=function, is_batched_fn=is_batched_fn, batch_size=batch_size, inputs=inputs, outputs=outputs, output_type=output_type, materialize=materialize, ) return _materialize( deferred, batch_size=batch_size, pbar=pbar, use_ray=use_ray, num_blocks=num_blocks, blocks_per_window=blocks_per_window, )
def _materialize( data: Union["DataFrame", "Column"], batch_size: int, pbar: bool, use_ray: bool, num_blocks: int, blocks_per_window: int, ): if use_ray: import logging import numpy as np import pandas as pd import pyarrow as pa import ray import torch import meerkat as mk from meerkat.columns.abstract import column ray.init(ignore_reinit_error=True, logging_level=logging.ERROR) ray.data.set_progress_bars(enabled=pbar) # Step 1: Walk through the DeferredColumns and build a list of functions curr = data fns = [] while isinstance(curr, mk.DeferredColumn): fns.append(curr.data.fn) # For linear pipelines, there will be either one elem in args or one key in # kwargs if curr.data.args: if len(curr.data.args) > 1: raise ValueError("Multiple args not supported with `use_ray=True`.") curr = curr.data.args[0] elif curr.data.kwargs: if len(curr.data.kwargs) > 1: raise ValueError( "Multiple kwargs not supported with `use_ray=True`." ) curr = curr.data.kwargs[next(iter(curr.data.kwargs))] else: raise ValueError("No args or kwargs.") # Step 2: Create the ray dataset from the base column # TODO (dean): test added_dim on other data types added_dim = False if isinstance(curr, mk.PandasScalarColumn): ds = ray.data.from_pandas(pd.DataFrame({"0": curr})).repartition(num_blocks) fns.append(lambda x: x["0"]) elif isinstance(curr, mk.ArrowScalarColumn): ds = ray.data.from_pandas(pa.table({"0": curr.data})).repartition( num_blocks ) fns.append(lambda x: x["0"]) elif isinstance(curr, mk.NumPyTensorColumn): ndarrays = curr.data if ndarrays.ndim == 1: added_dim = True ndarrays = np.expand_dims(ndarrays, 1) ds = ray.data.from_numpy(ndarrays).repartition(num_blocks) elif isinstance(curr, mk.TorchTensorColumn): ds = ray.data.from_torch(curr).repartition(num_blocks) elif isinstance(curr, mk.ObjectColumn): ds = ray.data.from_items(curr).repartition(num_blocks) elif isinstance(curr, mk.DataFrame): raise ValueError( "Multiple outputs (fan-out) not supported with `use_ray=True`." ) # TODO (dean): Support fan-out (would have to create multiple pipelines) # ds = ray.data.from_pandas(curr.data._repr_pandas_()[0]) # fns.append(lambda row: row.values()) else: raise ValueError( f"Base column is of unsupported type {type(curr)} with `use_ray=True`." ) # Step 3: Build the pipeline by walking backwards through fns pipe: ray.data.DatasetPipeline = ds.window(blocks_per_window=blocks_per_window) for fn in reversed(fns): # TODO (dean): if batch_size > 1, then use map_batches pipe = pipe.map(fn) # Step 4: Collect the results result_ds = iter( pipe.rewindow(blocks_per_window=num_blocks).iter_datasets() ).__next__() result = [] if data._output_type == mk.NumPyTensorColumn: for partition in result_ds.to_numpy_refs(): res = ray.get(partition) if len(res): result.append(res[0][0] if added_dim else res[0]) if added_dim: return mk.NumPyTensorColumn.from_array(result) return column(np.stack(result)) elif data._output_type == mk.TorchTensorColumn: for partition in result_ds.to_torch(): result.append(partition[0]) return column(torch.stack(result)) elif data._output_type == mk.PandasScalarColumn: for partition in result_ds.to_pandas_refs(): result.append(ray.get(partition)) return column(pd.concat(result)["value"]) elif data._output_type == mk.ArrowScalarColumn: for partition in result_ds.to_arrow_refs(): result.append(ray.get(partition)["value"].combine_chunks()) return column(pa.concat_arrays(result)) elif data._output_type == mk.ObjectColumn: for partition in result_ds.iter_batches(): result.extend(partition) return column(result) else: raise ValueError( f"Unsupported output type {data._output_type} with `use_ray=True`." ) else: from tqdm import tqdm from .concat import concat result = [] for batch_start in tqdm(range(0, len(data), batch_size), disable=not pbar): result.append( data._get( slice(batch_start, batch_start + batch_size, 1), materialize=True ) ) return concat(result)