Skip to content

Add support for new pandas UDF engine #418

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions src/blosc2/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -676,3 +676,58 @@ def wrapper(*args, **func_kwargs):
return decorator
else:
return decorator(func)


class PandasUdfEngine:
@staticmethod
def _ensure_numpy_data(data):
if not isinstance(data, np.ndarray):
try:
data = data.values
except AttributeError as err:
raise ValueError(
"blosc2.jit received an object of type {data.__name__}, which is not supported. "
"Try casting your Series or DataFrame to a NumPy dtype."
) from err
return data

@classmethod
def map(cls, data, func, args, kwargs, decorator, skip_na):
"""
JIT a NumPy array element-wise. In the case of Blosc2, functions are
expected to be vectorized NumPy operations, so the function is called
with the NumPy array as the function parameter, instead of calling the
function once for each element.
"""
raise NotImplementedError("The Blosc2 engine does not support map. Use apply instead.")

@classmethod
def apply(cls, data, func, args, kwargs, decorator, axis):
"""
JIT a NumPy array by column or row. In the case of Blosc2, functions are
expected to be vectorized NumPy operations, so the function is called
with the NumPy array as the function parameter, instead of calling the
function once for each column or row.
"""
data = cls._ensure_numpy_data(data)
func = decorator(func)
if data.ndim == 1 or axis is None:
# pandas Series.apply or pipe
return func(data, *args, **kwargs)
elif axis in (0, "index"):
# pandas apply(axis=0) column-wise
result = []
for row_idx in range(data.shape[1]):
result.append(func(data[:, row_idx], *args, **kwargs))
return np.vstack(result).transpose()
elif axis in (1, "columns"):
# pandas apply(axis=1) row-wise
result = []
for col_idx in range(data.shape[0]):
result.append(func(data[col_idx, :], *args, **kwargs))
return np.vstack(result)
else:
raise NotImplementedError(f"Unknown axis '{axis}'. Use one of 0, 1 or None.")


jit.__pandas_udf__ = PandasUdfEngine
120 changes: 120 additions & 0 deletions tests/test_pandas_udf_engine.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
#######################################################################
# Copyright (c) 2019-present, Blosc Development Team <[email protected]>
# All rights reserved.
#
# This source code is licensed under a BSD-style license (found in the
# LICENSE file in the root directory of this source tree)
#######################################################################

import numpy as np
import pytest

import blosc2


class TestPandasUDF:
def test_map(self):
def add_one(x):
return x + 1

data = np.array([1, 2])

with pytest.raises(NotImplementedError):
blosc2.jit.__pandas_udf__.map(
data,
add_one,
args=(),
kwargs={},
decorator=blosc2.jit,
skip_na=False,
)

def test_apply_1d(self):
def add_one(x):
return x + 1

data = np.array([1, 2])

result = blosc2.jit.__pandas_udf__.apply(
data,
add_one,
args=(),
kwargs={},
decorator=blosc2.jit,
axis=0,
)
assert result.shape == (2,)
assert result[0] == 2
assert result[1] == 3

def test_apply_1d_with_args(self):
def add_numbers(x, num1, num2):
return x + num1 + num2

data = np.array([1, 2])

result = blosc2.jit.__pandas_udf__.apply(
data,
add_numbers,
args=(10,),
kwargs={"num2": 100},
decorator=blosc2.jit,
axis=0,
)
assert result.shape == (2,)
assert result[0] == 111
assert result[1] == 112

def test_apply_2d(self):
def add_one(x):
assert x.shape == (2, 3)
return x + 1

data = np.array([[1, 2, 3], [4, 5, 6]])

result = blosc2.jit.__pandas_udf__.apply(
data,
add_one,
args=(),
kwargs={},
decorator=blosc2.jit,
axis=None,
)
expected = np.array([[2, 3, 4], [5, 6, 7]])
assert np.array_equal(result, expected)

def test_apply_2d_by_column(self):
def add_one(x):
assert x.shape == (2,)
return x + 1

data = np.array([[1, 2, 3], [4, 5, 6]])

result = blosc2.jit.__pandas_udf__.apply(
data,
add_one,
args=(),
kwargs={},
decorator=blosc2.jit,
axis=0,
)
expected = np.array([[2, 3, 4], [5, 6, 7]])
assert np.array_equal(result, expected)

def test_apply_2d_by_row(self):
def add_one(x):
assert x.shape == (3,)
return x + 1

data = np.array([[1, 2, 3], [4, 5, 6]])

result = blosc2.jit.__pandas_udf__.apply(
data,
add_one,
args=(),
kwargs={},
decorator=blosc2.jit,
axis=1,
)
expected = np.array([[2, 3, 4], [5, 6, 7]])
assert np.array_equal(result, expected)