Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions bodo/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from numba.extending import lower_builtin, models, register_model

import bodo
from bodo.pandas_compat import bodo_pandas_udf_execution_engine

# Add Bodo's options to Numba's allowed options/flags
numba.core.cpu.CPUTargetOptions.all_args_distributed_block = _mapping(
Expand Down Expand Up @@ -350,6 +351,8 @@ def return_wrapped_fn(py_func):
py_func = signature_or_function
return return_wrapped_fn(py_func)

return_wrapped_fn.__pandas_udf__ = bodo_pandas_udf_execution_engine

return return_wrapped_fn

elif "propagate_env" in options:
Expand All @@ -360,6 +363,9 @@ def return_wrapped_fn(py_func):
return _jit(signature_or_function, pipeline_class, **options)


jit.__pandas_udf__ = bodo_pandas_udf_execution_engine


def _jit(signature_or_function=None, pipeline_class=None, **options):
_init_extensions()

Expand Down
50 changes: 50 additions & 0 deletions bodo/pandas_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,3 +244,53 @@ def get_conversion_factor_to_ns(in_reso: str) -> int:
else:
raise ValueError(f"Unsupported resolution {in_reso}")
return factor * value


# Class responsible for executing UDFs using Bodo as the engine in
# newer version of pandas.
# https://github.com/pandas-dev/pandas/pull/61032
bodo_pandas_udf_execution_engine = None

if pandas_version >= (3, 0):
from collections.abc import Callable
from typing import Any

from pandas._typing import AggFuncType, Axis
from pandas.core.apply import BaseExecutionEngine

class BodoExecutionEngine(BaseExecutionEngine):
@staticmethod
def map(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not implemented yet in Pandas so there is no way to test. Leaving as a followup

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll open a PR adding engine to Series.map shortly. I have the implementation already, but map and apply tests are structured very differently, and I need to see how to implement the common fixtures without making the mess bigger or refactoring all the tests.

data: pd.Series | pd.DataFrame | np.ndarray,
func: AggFuncType,
args: tuple,
kwargs: dict[str, Any],
decorator: Callable | None,
skip_na: bool,
):
raise NotImplementedError("TODO: map")

@staticmethod
def apply(
data: pd.Series | pd.DataFrame | np.ndarray,
func: AggFuncType,
args: tuple,
kwargs: dict[str, Any],
decorator: Callable,
axis: Axis,
):
# raw = True converts data to ndarray first
if isinstance(data, np.ndarray):
raise ValueError(
"Bodo engine does not support the raw=True in DataFrame.apply."
)

jitted_func = decorator(func)

@decorator
def apply_func(data):
return data.apply(jitted_func, axis=axis, args=args)

return apply_func(data)

bodo_pandas_udf_execution_engine = BodoExecutionEngine
32 changes: 32 additions & 0 deletions bodo/tests/test_pandas_udf_compat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""
This file tests the Bodo implementation of the Pandas UDF interface.
See https://github.com/pandas-dev/pandas/pull/61032 for more details.

This feature is only availible on newer versions of Pandas (>=3.0)
"""

import numpy as np
import pandas as pd
import pytest

import bodo
from bodo.pandas_compat import pandas_version
from bodo.tests.utils import _test_equal, pytest_spawn_mode

pytestmark = [
pytest.mark.skipif(
pandas_version < (3, 0), reason="Third-party UDF engines requires Pandas >= 3.0"
)
] + pytest_spawn_mode


def test_basic_apply_example():
"""Simplest test to check Pandas UDF apply hook is set up properly"""

df = pd.DataFrame({"A": np.arange(30)})

bodo_result = df.apply(lambda x: x.A, axis=1, engine=bodo.jit)

pandas_result = df.apply(lambda x: x.A, axis=1)

_test_equal(bodo_result, pandas_result, check_pandas_types=False)
10 changes: 0 additions & 10 deletions pixi.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion pixi.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ pip = "*"
# Core Python Deps
numba = "==0.61.0"
numpy = ">=1.24,<2.2"
pandas = ">=2.2,<2.3"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For testing locally, will restore before merging.

pyarrow = { version = "==19.0.0", channel = "conda-forge" }
fsspec = ">=2021.09"
requests = "*"
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ classifiers = [
dependencies = [
"numba==0.61.0",
"pyarrow==19.0.0",
"pandas>=2.2,<2.3",
# "pandas>=2.2,<2.3",
"numpy>=1.24,<2.2",
# fsspec >= 2021.09 because it includes Arrow filesystem wrappers (useful for fs.glob() for example)
"fsspec>=2021.09",
Expand Down
Loading