diff --git a/sdc/datatypes/hpat_pandas_series_rolling_functions.py b/sdc/datatypes/hpat_pandas_series_rolling_functions.py index 57d9a549e..e14d89f02 100644 --- a/sdc/datatypes/hpat_pandas_series_rolling_functions.py +++ b/sdc/datatypes/hpat_pandas_series_rolling_functions.py @@ -37,6 +37,7 @@ from sdc.datatypes.common_functions import _sdc_pandas_series_align from sdc.datatypes.hpat_pandas_series_rolling_types import SeriesRollingType from sdc.hiframes.pd_series_type import SeriesType +from sdc.utilities.prange_utils import parallel_chunks from sdc.utilities.sdc_typing_utils import TypeChecker from sdc.utilities.utils import sdc_overload_method, sdc_register_jitable @@ -214,12 +215,6 @@ def arr_std(arr, ddof): return arr_var(arr, ddof) ** 0.5 -@sdc_register_jitable -def arr_sum(arr): - """Calculate sum of values""" - return arr.sum() - - @sdc_register_jitable def arr_var(arr, ddof): """Calculate unbiased variance of values""" @@ -308,12 +303,87 @@ def apply_minp(arr, ddof, minp): gen_hpat_pandas_series_rolling_impl(arr_skew)) hpat_pandas_rolling_series_std_impl = register_jitable( gen_hpat_pandas_series_rolling_ddof_impl(arr_std)) -hpat_pandas_rolling_series_sum_impl = register_jitable( - gen_hpat_pandas_series_rolling_impl(arr_sum)) hpat_pandas_rolling_series_var_impl = register_jitable( gen_hpat_pandas_series_rolling_ddof_impl(arr_var)) +@sdc_register_jitable +def pop_sum(value, nfinite, result): + """Calculate the window sum without old value.""" + if numpy.isfinite(value): + nfinite -= 1 + result -= value + + return nfinite, result + + +@sdc_register_jitable +def put_sum(value, nfinite, result): + """Calculate the window sum with new value.""" + if numpy.isfinite(value): + nfinite += 1 + result += value + + return nfinite, result + + +@sdc_register_jitable +def result_or_nan(nfinite, minp, result): + """Get result taking into account min periods.""" + if nfinite < minp: + return numpy.nan + + return result + + +def gen_sdc_pandas_series_rolling_impl(pop, put, init_result=numpy.nan): + """Generate series rolling methods implementations based on pop/put funcs""" + def impl(self): + win = self._window + minp = self._min_periods + + input_series = self._data + input_arr = input_series._data + length = len(input_arr) + output_arr = numpy.empty(length, dtype=float64) + + chunks = parallel_chunks(length) + for i in prange(len(chunks)): + chunk = chunks[i] + nfinite = 0 + result = init_result + + prelude_start = max(0, chunk.start - win + 1) + prelude_stop = min(chunk.start, prelude_start + win) + + interlude_start = prelude_stop + interlude_stop = min(prelude_start + win, chunk.stop) + + for idx in range(prelude_start, prelude_stop): + value = input_arr[idx] + nfinite, result = put(value, nfinite, result) + + for idx in range(interlude_start, interlude_stop): + value = input_arr[idx] + nfinite, result = put(value, nfinite, result) + output_arr[idx] = result_or_nan(nfinite, minp, result) + + for idx in range(interlude_stop, chunk.stop): + put_value = input_arr[idx] + pop_value = input_arr[idx - win] + nfinite, result = put(put_value, nfinite, result) + nfinite, result = pop(pop_value, nfinite, result) + output_arr[idx] = result_or_nan(nfinite, minp, result) + + return pandas.Series(output_arr, input_series._index, + name=input_series._name) + return impl + + +sdc_pandas_series_rolling_sum_impl = register_jitable( + gen_sdc_pandas_series_rolling_impl(pop_sum, put_sum, init_result=0.)) + + @sdc_rolling_overload(SeriesRollingType, 'apply') def hpat_pandas_series_rolling_apply(self, func, raw=None): @@ -619,13 +689,13 @@ def hpat_pandas_series_rolling_std(self, ddof=1): return hpat_pandas_rolling_series_std_impl -@sdc_rolling_overload(SeriesRollingType, 'sum') +@sdc_overload_method(SeriesRollingType, 'sum') def hpat_pandas_series_rolling_sum(self): ty_checker = TypeChecker('Method rolling.sum().') ty_checker.check(self, SeriesRollingType) - return hpat_pandas_rolling_series_sum_impl + return sdc_pandas_series_rolling_sum_impl @sdc_rolling_overload(SeriesRollingType, 'var') diff --git a/sdc/tests/test_rolling.py b/sdc/tests/test_rolling.py index 9d3acd92e..73984c09b 100644 --- a/sdc/tests/test_rolling.py +++ b/sdc/tests/test_rolling.py @@ -847,8 +847,8 @@ def test_impl(obj, window, min_periods): hpat_func = self.jit(test_impl) assert_equal = self._get_assert_equal(obj) - for window in range(0, len(obj) + 3, 2): - for min_periods in range(0, window + 1, 2): + for window in range(len(obj) + 2): + for min_periods in range(window): with self.subTest(obj=obj, window=window, min_periods=min_periods): jit_result = hpat_func(obj, window, min_periods) diff --git a/sdc/tests/tests_perf/test_perf_series_rolling.py b/sdc/tests/tests_perf/test_perf_series_rolling.py index a6c988882..a38bbe1a9 100644 --- a/sdc/tests/tests_perf/test_perf_series_rolling.py +++ b/sdc/tests/tests_perf/test_perf_series_rolling.py @@ -24,21 +24,29 @@ # OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, # EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. # ***************************************************************************** -import string + import time -import numba import pandas import numpy as np from sdc.tests.test_utils import test_global_input_data_float64 from sdc.tests.tests_perf.test_perf_base import TestBase -from sdc.tests.tests_perf.test_perf_utils import (calc_compilation, get_times, - perf_data_gen_fixed_len) +from sdc.tests.tests_perf.test_perf_utils import perf_data_gen_fixed_len from .generator import generate_test_cases from .generator import TestCase as TC +rolling_usecase_tmpl = """ +def series_rolling_{method_name}_usecase(data, {extra_usecase_params}): + start_time = time.time() + for i in range({ncalls}): + res = data.rolling({rolling_params}).{method_name}({method_params}) + end_time = time.time() + return end_time - start_time, res +""" + + def get_rolling_params(window=100, min_periods=None): """Generate supported rolling parameters""" rolling_params = [f'{window}'] @@ -48,14 +56,37 @@ def get_rolling_params(window=100, min_periods=None): return ', '.join(rolling_params) +def gen_series_rolling_usecase(method_name, rolling_params=None, + extra_usecase_params='', + method_params='', ncalls=1): + """Generate series rolling method use case""" + if not rolling_params: + rolling_params = get_rolling_params() + + func_text = rolling_usecase_tmpl.format(**{ + 'method_name': method_name, + 'extra_usecase_params': extra_usecase_params, + 'rolling_params': rolling_params, + 'method_params': method_params, + 'ncalls': ncalls + }) + + global_vars = {'np': np, 'time': time} + loc_vars = {} + exec(func_text, global_vars, loc_vars) + _series_rolling_usecase = loc_vars[f'series_rolling_{method_name}_usecase'] + + return _series_rolling_usecase + + # python -m sdc.runtests sdc.tests.tests_perf.test_perf_series_rolling.TestSeriesRollingMethods class TestSeriesRollingMethods(TestBase): - # more than 19 columns raise SystemError: CPUDispatcher() returned a result with an error set - max_columns_num = 19 - @classmethod def setUpClass(cls): super().setUpClass() + cls.map_ncalls_dlength = { + 'sum': (100, [8 * 10 ** 5]), + } def _test_case(self, pyfunc, name, total_data_length, data_num=1, input_data=test_global_input_data_float64): @@ -82,6 +113,20 @@ def _test_case(self, pyfunc, name, total_data_length, data_num=1, self._test_jit(pyfunc, base, *args) self._test_py(pyfunc, base, *args) + def _test_series_rolling_method(self, name, rolling_params=None, + extra_usecase_params='', method_params=''): + ncalls, total_data_length = self.map_ncalls_dlength[name] + usecase = gen_series_rolling_usecase(name, rolling_params=rolling_params, + extra_usecase_params=extra_usecase_params, + method_params=method_params, ncalls=ncalls) + data_num = 1 + if extra_usecase_params: + data_num += len(extra_usecase_params.split(', ')) + self._test_case(usecase, name, total_data_length, data_num=data_num) + + def test_series_rolling_sum(self): + self._test_series_rolling_method('sum') + cases = [ TC(name='apply', size=[10 ** 7], params='func=lambda x: np.nan if len(x) == 0 else x.mean()'), @@ -96,7 +141,6 @@ def _test_case(self, pyfunc, name, total_data_length, data_num=1, TC(name='quantile', size=[10 ** 7], params='0.2'), TC(name='skew', size=[10 ** 7]), TC(name='std', size=[10 ** 7]), - TC(name='sum', size=[10 ** 7]), TC(name='var', size=[10 ** 7]), ] diff --git a/sdc/utilities/prange_utils.py b/sdc/utilities/prange_utils.py index 4a44fe249..150e72734 100644 --- a/sdc/utilities/prange_utils.py +++ b/sdc/utilities/prange_utils.py @@ -29,7 +29,7 @@ import sdc from typing import NamedTuple -from sdc.utilities.utils import sdc_overload, sdc_register_jitable +from sdc.utilities.utils import sdc_register_jitable class Chunk(NamedTuple):