Source code for meerkat.ops.merge

import collections.abc
from typing import List, Sequence, Union

import numpy as np

from meerkat import DataFrame, ObjectColumn
from meerkat.columns.deferred.base import DeferredColumn
from meerkat.columns.scalar import ScalarColumn
from meerkat.columns.tensor.abstract import TensorColumn
from meerkat.columns.tensor.torch import TorchTensorColumn
from meerkat.errors import MergeError
from meerkat.interactive.graph import reactive
from meerkat.ops.decorators import check_primary_key
from meerkat.provenance import capture_provenance


[docs]@capture_provenance(capture_args=["left_on", "on", "right_on", "how"]) @check_primary_key @reactive def merge( left: DataFrame, right: DataFrame, how: str = "inner", on: Union[str, List[str]] = None, left_on: Union[str, List[str]] = None, right_on: Union[str, List[str]] = None, sort: bool = False, suffixes: Sequence[str] = ("_x", "_y"), validate=None, ) -> DataFrame: """Perform a database-style join operation between two DataFrames. Args: left (DataFrame): Left DataFrame. right (DataFrame): Right DataFrame. how (str, optional): The join type. Defaults to "inner". on (Union[str, List[str]], optional): The columns(s) to join on. These columns must be :class:`~meerkat.ScalarColumn`. Defaults to None, in which case the `left_on` and `right_on` parameters must be passed. left_on (Union[str, List[str]], optional): The column(s) in the left DataFrame to join on. These columns must be :class:`~meerkat.ScalarColumn`. Defaults to None. right_on (Union[str, List[str]], optional): The column(s) in the right DataFrame to join on. These columns must be :class:`~meerkat.ScalarColumn`. Defaults to None. sort (bool, optional): Whether to sort the result DataFrame by the join key(s). Defaults to False. suffixes (Sequence[str], optional): Suffixes to use in the case their are conflicting column names in the result DataFrame. Should be a sequence of length two, with ``suffixes[0]`` the suffix for the column from the left DataFrame and ``suffixes[1]`` the suffix for the right. Defaults to ("_x", "_y"). validate (_type_, optional): The check to perform on the result DataFrame. Defaults to None, in which case no check is performed. Valid options are: * “one_to_one” or “1:1”: check if merge keys are unique in both left and right datasets. * “one_to_many” or “1:m”: check if merge keys are unique in left dataset. * “many_to_one” or “m:1”: check if merge keys are unique in right dataset. * “many_to_many” or “m:m”: allowed, but does not result in checks. Returns: DataFrame: The merged DataFrame. """ if how == "cross": raise ValueError("DataFrame does not support cross merges.") # pragma: no cover if (on is None) and (left_on is None) and (right_on is None): raise MergeError("Merge expects either `on` or `left_on` and `right_on`") left_on = on if left_on is None else left_on right_on = on if right_on is None else right_on # cast `left_on` and `right_on` to lists left_on = [left_on] if isinstance(left_on, str) else left_on right_on = [right_on] if isinstance(right_on, str) else right_on # ensure we can merge on specified columns _check_merge_columns(left, left_on) _check_merge_columns(right, right_on) # convert mk.DataFrame to pd.DataFrame so we can apply Pandas merge # (1) only include columns we are joining on left_df = left[left_on].to_pandas() right_df = right[right_on].to_pandas() # (2) add index columns, which we'll use to reconstruct the columns we excluded from # the Pandas merge if ("__right_indices__" in right_df) or ("__left_indices__" in left_df): raise MergeError( "The column names '__right_indices__' and '__left_indices__' cannot appear " "in the right and left panels respectively. They are used by merge." ) left_df["__left_indices__"] = np.arange(len(left_df)) right_df["__right_indices__"] = np.arange(len(right_df)) # apply pandas merge merged_df = left_df.merge( right_df, how=how, left_on=left_on, right_on=right_on, sort=sort, validate=validate, suffixes=suffixes, ) left_indices = merged_df.pop("__left_indices__").values right_indices = merged_df.pop("__right_indices__").values merged_df = merged_df[list(set(left_on) & set(right_on))] # reconstruct other columns not in the `left_on & right_on` using `left_indices` # and `right_indices`, the row order returned by merge def _cols_to_construct(df: DataFrame): # don't construct columns in both `left_on` and `right_on` because we use # `merged_df` for these return [k for k in df.keys() if k not in (set(left_on) & set(right_on))] left_cols_to_construct = _cols_to_construct(left) right_cols_to_construct = _cols_to_construct(right) new_left = ( _construct_from_indices(left[left_cols_to_construct], left_indices) # need to check for special case where there are no columns other than those in # the intersection of `left_on` and `right_on` if len(left_cols_to_construct) > 0 else None ) new_right = ( _construct_from_indices(right[right_cols_to_construct], right_indices) # need to check for special case where there are no columns other than those in # the intersection of `left_on` and `right_on` if len(right_cols_to_construct) > 0 else None ) if new_left is None and new_right is not None: merged = new_right elif new_left is not None and new_right is None: merged = new_left elif new_left is not None and new_right is not None: # concatenate the two new dataframes if both have columns, this should be by # far the most common case merged = new_left.append(new_right, axis="columns", suffixes=suffixes) else: merged = None if merged is not None: # add columns in both `left_on` and `right_on`, # casting to the column type in left for name, column in merged_df.items(): merged.add_column(name, left[name]._clone(data=column.values)) merged.data.reorder(merged.columns[-1:] + merged.columns[:-1]) else: merged = DataFrame.from_pandas(merged_df) # set primary key if either the `left` or `right` has a primary key in the result # note, the `check_primary_key` wrapper wrapper ensures that the primary_key is # actually valid if (left.primary_key_name is not None) and left.primary_key_name in merged: merged.set_primary_key(left.primary_key_name, inplace=True) elif (right.primary_key_name is not None) and right.primary_key_name in merged: merged.set_primary_key(right.primary_key_name, inplace=True) return merged
def _construct_from_indices(df: DataFrame, indices: np.ndarray): if np.isnan(indices).any(): # when performing "outer", "left", and "right" merges, column indices output # by pandas merge can include `nan` in rows corresponding to merge keys that # only appear in one of the two panels. For these columns, we convert the # column to ListColumn, and fill with "None" wherever indices is "nan". data = {} for name, col in df.items(): if isinstance(col, (TorchTensorColumn, TorchTensorColumn, ScalarColumn)): new_col = col[indices.astype(int)] if isinstance(new_col, TorchTensorColumn): new_col = new_col.to(float) elif isinstance(new_col, ScalarColumn): if new_col.dtype != "object": new_col = new_col.astype(float) else: new_col = new_col.astype(float) new_col[np.isnan(indices)] = np.nan data[name] = new_col else: data[name] = ObjectColumn( [None if np.isnan(index) else col[int(index)] for index in indices] ) return df._clone(data=data) else: # if there are no `nan`s in the indices, then we can just lazy index the # original column return df[indices] def _check_merge_columns(df: DataFrame, on: List[str]): for name in on: column = df[name] if isinstance(column, TensorColumn): if len(column.shape) > 1: raise MergeError( f"Cannot merge on a TensorColumn column `{name}` that has more " "than one dimension." ) elif isinstance(column, ObjectColumn): if not all([isinstance(cell, collections.abc.Hashable) for cell in column]): raise MergeError( f"Cannot merge on column `{name}`, contains unhashable objects." ) elif isinstance(column, DeferredColumn): raise MergeError(f"Cannot merge on DeferredColumn `{name}`.") elif not isinstance(column, ScalarColumn): raise MergeError(f"Cannot merge on column `{name}` of type {type(column)}.")