Mapping: map and defer
Contents
Mapping: map
and defer
¶
In this guide, we discuss how we can create new columns by applying a function to each row of existing columns: we call this mapping. We provide detailed examples of how to the map()
operation. We also introduce the update()
and filter()
operations, which are utilities that wrap the map()
operation.
Map¶
Let’s warm up with an example: converting a column of birth years to a column of ages. We start with a small DataFrame of voters with two columns: birth_year
, which contains the birth year of each person, and residence
, which contains the state in which each person lives.
import meerkat as mk
df = mk.DataFrame({
"birth_year": [1967, 1993, 2010, 1985, 2007, 1990, 1943],
"residence": ["MA", "LA", "NY", "NY", "MA", "MA", "LA"]
})
Map with a single input column¶
We want to create a new column, age
, which contains the age of each person. We can do this with the map()
operation, passing a lambda function that takes a birth year and returns the age.
import datetime
df["age"] = df["birth_year"].map(
lambda x: datetime.datetime.now().year - x
)
df
birth_year | residence | age | |
---|---|---|---|
0 | 1967 | MA | 56 |
1 | 1993 | LA | 30 |
2 | 2010 | NY | 13 |
3 | 1985 | NY | 38 |
4 | 2007 | MA | 16 |
5 | 1990 | MA | 33 |
6 | 1943 | LA | 80 |
Note that we add the new column to the DataFrame in-place by assigning the result of the map()
operation to the age
column.
Map with multiple input columns¶
We can also map a function that takes more than one column as argument. For example, say we wanted to create new column ma_eligible
that indicates whether or not a person is eligible to vote in Massachusetts. We can do this with the map()
operation, passing a lambda function that takes age and residence.
df["ma_eligible"] = df.map(
lambda age, residence: (residence == "MA") and (age >= 18)
)
df
birth_year | residence | age | ma_eligible | |
---|---|---|---|---|
0 | 1967 | MA | 56 | True |
1 | 1993 | LA | 30 | False |
2 | 2010 | NY | 13 | False |
3 | 1985 | NY | 38 | False |
4 | 2007 | MA | 16 | False |
5 | 1990 | MA | 33 | True |
6 | 1943 | LA | 80 | False |
The call to map()
inspects the signature of the function and determines that it takes two arguments: age
and residence
. It then finds the columns with the same names in the DataFrame and passes the corresponding values to the function. If the function takes an non-default argument that is not a column in the DataFrame, the operation will raise a ValueError
.
We can also specify the correspondence between arguments and columns explicitly with the inputs
argument to map()
. While the inspection of function signature is convenient, it can be error-prone if used with a function that has a large number of arguments, some of which may spuriously match column names. The cell below is functionally equivalent to the one above, but is slightly more verbose.
df["ma_eligible"] = df.map(
lambda x, y: (x == "MA") and (y >= 18),
inputs={"age": "y", "residence": "x"}
)
df
birth_year | residence | age | ma_eligible | |
---|---|---|---|---|
0 | 1967 | MA | 56 | True |
1 | 1993 | LA | 30 | False |
2 | 2010 | NY | 13 | False |
3 | 1985 | NY | 38 | False |
4 | 2007 | MA | 16 | False |
5 | 1990 | MA | 33 | True |
6 | 1943 | LA | 80 | False |
Note
Some readers may wonder whether func
{map} was the right choice
in the above example. After all, we could have written a vectorized expression df["ma_eligible"] = (df["residence"] == "MA") & (df["age"] >= 18)
. In many cases, this would indeed be more efficient. The example above is meant to illustrate the general pattern of using map()
. The examples in the following sections will highlight the benefits of map()
.
Map with multiple output columns¶
It’s also possible to map a single function that returns multiple values.
For example, say we wanted to create a two columns ma_eligible
and la_eligible
that indicate whether or not a person is eligible to vote in Massachusetts and Louisiana, respectively. We can do this with the map()
operation, passing a lambda function that takes age and residence and returns a tuple of two booleans.
def is_eligibile(age, residence):
old_enough = age >= 18
return (residence == "MA") and old_enough, (residence == "LA") and old_enough
df.map(is_eligibile)
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
Cell In[5], line 5
2 old_enough = age >= 18
3 return (residence == "MA") and old_enough, (residence == "LA") and old_enough
----> 5 df.map(is_eligibile)
File ~/work/meerkat/meerkat/meerkat/interactive/graph/marking.py:74, in unmarked.__call__.<locals>.decorate_context(*args, **kwargs)
71 @wraps(func)
72 def decorate_context(*args, **kwargs):
73 with self.clone():
---> 74 return reactive(func, nested_return=False)(*args, **kwargs)
File ~/work/meerkat/meerkat/meerkat/interactive/graph/reactivity.py:211, in reactive.<locals>.__reactive.<locals>.wrapper(*args, **kwargs)
209 # Call the function on the args and kwargs
210 with unmarked():
--> 211 result = fn(*unpacked_args, **unpacked_kwargs)
213 # TODO: Check if result is equal to one of the inputs.
214 # If it is, we need to copy it.
216 if _is_unmarked_context or _force_no_react or not any_inputs_marked:
217 # If we are in an unmarked context, then we don't need to create
218 # any nodes in the graph.
219 # `fn` should be run as normal.
File ~/work/meerkat/meerkat/meerkat/interactive/graph/reactivity.py:144, in reactive.<locals>.__reactive.<locals>.wrapper.<locals>._fn_outer_wrapper.<locals>._fn_wrapper(*args, **kwargs)
142 @wraps(_fn)
143 def _fn_wrapper(*args, **kwargs):
--> 144 return _fn(*args, **kwargs)
File ~/work/meerkat/meerkat/meerkat/dataframe.py:1208, in DataFrame.map(self, function, is_batched_fn, batch_size, inputs, outputs, output_type, materialize, **kwargs)
1195 def map(
1196 self,
1197 function: Callable,
(...)
1204 **kwargs,
1205 ) -> Optional[Union[Dict, List, Column]]:
1206 from meerkat.ops.map import map
-> 1208 return map(
1209 data=self,
1210 function=function,
1211 is_batched_fn=is_batched_fn,
1212 batch_size=batch_size,
1213 inputs=inputs,
1214 outputs=outputs,
1215 output_type=output_type,
1216 materialize=materialize,
1217 **kwargs,
1218 )
File ~/work/meerkat/meerkat/meerkat/ops/map.py:473, in map(data, function, is_batched_fn, batch_size, inputs, outputs, output_type, materialize, use_ray, num_blocks, blocks_per_window, pbar, **kwargs)
363 @docs.doc(source=_SHARED_DOCS_, data="data", name="defer")
364 def map(
365 data: Union["DataFrame", "Column"],
(...)
377 **kwargs,
378 ):
379 """Create a new :class:`Column` or :class:`DataFrame` by applying a
380 function to each row in ${data}.
381
(...)
470 df["ma_eligible"]
471 """
--> 473 deferred = defer(
474 data=data,
475 function=function,
476 is_batched_fn=is_batched_fn,
477 batch_size=batch_size,
478 inputs=inputs,
479 outputs=outputs,
480 output_type=output_type,
481 materialize=materialize,
482 )
483 return _materialize(
484 deferred,
485 batch_size=batch_size,
(...)
489 blocks_per_window=blocks_per_window,
490 )
File ~/work/meerkat/meerkat/meerkat/ops/map.py:353, in defer(data, function, is_batched_fn, batch_size, inputs, outputs, output_type, materialize)
348 if not isinstance(output_type, Sequence):
349 raise ValueError(
350 "Must provide a `output_type` sequence if `outputs` is a sequence."
351 )
352 return DataFrame(
--> 353 {
354 col: DeferredColumn(
355 data=BlockView(block_index=output_key, block=block),
356 output_type=output_type[output_key],
357 )
358 for output_key, col in enumerate(outputs)
359 }
360 )
File ~/work/meerkat/meerkat/meerkat/ops/map.py:354, in <dictcomp>(.0)
348 if not isinstance(output_type, Sequence):
349 raise ValueError(
350 "Must provide a `output_type` sequence if `outputs` is a sequence."
351 )
352 return DataFrame(
353 {
--> 354 col: DeferredColumn(
355 data=BlockView(block_index=output_key, block=block),
356 output_type=output_type[output_key],
357 )
358 for output_key, col in enumerate(outputs)
359 }
360 )
File ~/work/meerkat/meerkat/meerkat/columns/deferred/base.py:59, in DeferredColumn.__init__(self, data, output_type, *args, **kwargs)
51 def __init__(
52 self,
53 data: Union[DeferredOp, BlockView],
(...)
56 **kwargs,
57 ):
58 self._output_type = output_type
---> 59 super(DeferredColumn, self).__init__(data, *args, **kwargs)
File ~/work/meerkat/meerkat/meerkat/columns/abstract.py:105, in Column.__init__(self, data, collate_fn, formatters, *args, **kwargs)
96 self._set_data(data)
98 super(Column, self).__init__(
99 collate_fn=collate_fn,
100 *args,
101 **kwargs,
102 )
104 self._formatters = (
--> 105 formatters if formatters is not None else self._get_default_formatters()
106 )
File ~/work/meerkat/meerkat/meerkat/interactive/graph/marking.py:74, in unmarked.__call__.<locals>.decorate_context(*args, **kwargs)
71 @wraps(func)
72 def decorate_context(*args, **kwargs):
73 with self.clone():
---> 74 return reactive(func, nested_return=False)(*args, **kwargs)
File ~/work/meerkat/meerkat/meerkat/interactive/graph/reactivity.py:211, in reactive.<locals>.__reactive.<locals>.wrapper(*args, **kwargs)
209 # Call the function on the args and kwargs
210 with unmarked():
--> 211 result = fn(*unpacked_args, **unpacked_kwargs)
213 # TODO: Check if result is equal to one of the inputs.
214 # If it is, we need to copy it.
216 if _is_unmarked_context or _force_no_react or not any_inputs_marked:
217 # If we are in an unmarked context, then we don't need to create
218 # any nodes in the graph.
219 # `fn` should be run as normal.
File ~/work/meerkat/meerkat/meerkat/interactive/graph/reactivity.py:144, in reactive.<locals>.__reactive.<locals>.wrapper.<locals>._fn_outer_wrapper.<locals>._fn_wrapper(*args, **kwargs)
142 @wraps(_fn)
143 def _fn_wrapper(*args, **kwargs):
--> 144 return _fn(*args, **kwargs)
File ~/work/meerkat/meerkat/meerkat/columns/deferred/base.py:170, in DeferredColumn._get_default_formatters(self)
167 from meerkat.interactive.formatter.base import deferred_formatter_group
169 col = self._get(index=slice(0, 1, 1), materialize=True)
--> 170 return deferred_formatter_group(col.formatters)
AttributeError: 'numpy.ndarray' object has no attribute 'formatters'
Note that the output of the function was split into two columns. The names of the columns are just the indices in the tuple returned by the function. We can rename the columns by passing a tuple of column names to the outputs
argument of map()
.
df.map(is_eligibile, outputs=("ma_eligible", "la_eligible"))
Instead of outputting two columns, one for each state, we may want to output a single ObjectColumn
containing tuples. To accomplish this we can pass "single"
to the outputs argument.
df.map(is_eligibile, outputs="single")
Warning
ObjectColumn
is a column type that can store arbitrary Python objects, but it is backed by a Python list. This means it is much slower than other column types. We discuss this more in the guide on Object Columns.
If the function returns a dictionary, we can skip the outputs
argument and map()
will automatically use the keys of the dictionary as column names.
def is_eligibile(age, residence):
old_enough = age >= 18
return {
"ma_eligible": (residence == "MA") and old_enough,
"la_eligible": (residence == "LA") and old_enough
}
df.map(is_eligibile)
If we would like to use a different name for the columns or only use a subset of the keys, we can pass a dictionary to the outputs
argument.
df.map(is_eligibile, outputs={"ma_eligible": "ma", "la_eligible": "la"})
Warning
Consistent number of outputs. If a function returns multiple values (either as a tuple or a dictionary), the number of values or the keys must be consistent across all calls to the function.
Consistent type of outputs. It is also important to note that the type of the resulting column is inferred by the first row of the input column(s). As a result, if later rows return values of a different type or shape, an error may be raised because the value cannot be inserted in the inferred column type. To explicitly specify the type of the output column, use the output_types
argument to map()
.
Deferred map and chaining¶
In this section, we discuss how we can chain together multiple map operations using deferred maps. This produces a chain of operations that can be executed together. In the following section, we’ll discuss how we can pipeline chained operations to take advantage of parallelism.
Note
If you’re unfamiliar with DeferredColumns, you may want to read the guide on Deferred Columns before diving into this section.
Simple deferred map¶
In addition to map()
described above, Meerkat provides defer()
, which creates a DeferredColumn
representing a deferred map. The two functions share nearly the exact same signature (i.e. all that was discussed in the previous section around multiple inputs and ouputs also applies to defer()
). The difference is that defer()
returns a column that has not yet been computed. It is a placeholder for a column that will be computed later.
To demonstrate, let’s repeat the example above, this time using a deferred map to create a 2-step chain of operations.
# no computation yet
df["age"] = df["birth_year"].defer(
lambda x: datetime.datetime.now().year - x
)
# computation is done here
df["ma_eligible"] = df.map(
lambda age, residence: (residence == "MA") and (age >= 18)
)
df
The only difference between this code and the code in the previous section is that here we use defer()
instead of map()
when creating the age
column. The result is the same, but here the computation of both "age"
and "ma_eligible"
is performed together at the end, instead of in two stages.
Chaining deferred maps¶
Let’s motivate the use of deferred maps with a more involved example: processing a dataset of images. We’re going to use the Imagenette dataset, a small subset of the original ImageNet. We can load it from the Meerkat dataset registry with the get()
function. This dataset is made up of 10 classes (e.g. “garbage truck”, “gas pump”, “golf ball”), but we’ll focus on a simpler binary classification task: “parachute” vs. “golf ball”.
df = mk.get("imagenette")
df = df[df["label"].isin(["parachute", "golf ball"])].sample(500)
df[["img", "label", "path"]]
Below, we’ve defined a classifier with a silly decision rule: it classifies an image as containing a parachute if in more than half of the pixels, the blue channel has the highest value (the logic being that parachutes are often photographed in the sky).
The classifier has two methods which we need to chain together: preprocess
and predict
. The preprocess
method takes an image and returns a NumPy array of shape (224, 224, 3)
. The predict
method takes a batch of images and returns a boolean array with predictions.
from PIL.Image import Image
import numpy as np
class ParachuteClassifier:
def preprocess(self, img: Image) -> np.ndarray:
"""Prepare an image for classification."""
return np.array(img.convert("RGB").resize((224, 224)))
def predict(self, batch: np.ndarray) -> np.ndarray:
"""Classify a batch of images as containing a parachute or not, using a
simple decision rule.
"""
return (np.argmax(batch, axis=3) == 2).mean(axis=1).mean(axis=1) > 0.5
classifier = ParachuteClassifier()
Because only one of the two methods is batched, chaining them correctly is a bit tricky (one approach might invovle a double for-loop). Fortunately, with deferred maps, we can chain together functions that use different batching strategies. First, we create a deferred column that applies preprocess
to each image – by default is_batched_fn=False
, so the preprocess
method is applied to each image individually. Next, we map the predict
method over the deferred column. Because predict
is batched, we can use is_batched_fn=True
and specify a batch_size
to indicate that the predict
method should be applied to batches of images.
preprocessed = df["img"].defer(classifier.preprocess)
df["prediction"] = preprocessed.map(
classifier.predict, is_batched_fn=True, batch_size=32
)
Finally we can compute the accuracy of the classifier by mapping a simple function that compares the predictions to the ground truth labels.
accuracy = df.map(lambda prediction, label: prediction == (label == "parachute")).mean()
print(f"Accuracy: {accuracy:.2%}")
Not too bad! I guess you can get pretty far in machine learning relying on spurious correlations.
We could have chained all of these operations together into a single chain.
preprocessed = df["img"].defer(classifier.preprocess)
df["prediction"] = preprocessed.defer(
classifier.predict, is_batched_fn=True, batch_size=32
)
accuracy = df.map(lambda prediction, label: prediction == (label == "parachute")).mean()
Here’s a trick question: How long is the resulting chain? At first glance, it seems like the chain is three maps long: preprocess
, predict
, then accuracy
. However, recall from the guide on Deferred Columns that images and other complex data types are typically stored in DeferredColumn
s – that is, the "img"
column is itself a deferred map. This map applies an image loading function to each filepath in the dataset. It could have been created with a line like df["img"] = df["filepath"].defer(load_image)
. Because our first defer()
call in the cell above was made on the "img"
column, the resulting chain is actually four maps long: load
, preprocess
, predict
, then accuracy
.
Why not just use multiple map
calls? Chaining together deferred maps has two main advantages over simply calling map
multiple times.
Memory. If one of the intermediate maps produces images or other large data types (as does
preprocess
in this example), then the resulting column may not be able to fit in memory. With a regularmap
, that entire column will be materialized before the nextmap
begins. If we use a deferred map, then intermediate results are released from memory once they are consumed by the next map in the chain. This enables us to process data types that are too large to fit in memory (e.g. images, video, audio).Parallelism. Using deferred maps allows us to better take advantage of the parallelism afforded by our system, especially if each map in the chain depends on different system resources (e.g. CPU vs. GPU vs. I/O bandwidth). Meerkat supports pipelining chained maps, which allows us to run multiple maps in parallel. We discuss this in the next section.
Pipelining and Parallelism¶
Because map applies the same function to each row, it is a delightfully parallelizable operation. In this section, we discuss how to parallelize maps and how to pipeline a chain of parallel maps.
Danger
WIP Pipelining is currently an experimental feature. It will be implemented using Ray.