From b2a4d9d543da2ce80cafc1cc49e043322607d70d Mon Sep 17 00:00:00 2001 From: Denis Date: Sun, 16 Feb 2020 19:26:54 +0300 Subject: [PATCH 01/12] Optimize series.rolling.sum() --- .../hpat_pandas_series_rolling_functions.py | 58 ++++++++++++++++--- sdc/tests/test_rolling.py | 4 +- 2 files changed, 51 insertions(+), 11 deletions(-) diff --git a/sdc/datatypes/hpat_pandas_series_rolling_functions.py b/sdc/datatypes/hpat_pandas_series_rolling_functions.py index 57d9a549e..c2793df57 100644 --- a/sdc/datatypes/hpat_pandas_series_rolling_functions.py +++ b/sdc/datatypes/hpat_pandas_series_rolling_functions.py @@ -214,12 +214,6 @@ def arr_std(arr, ddof): return arr_var(arr, ddof) ** 0.5 -@sdc_register_jitable -def arr_sum(arr): - """Calculate sum of values""" - return arr.sum() - - @sdc_register_jitable def arr_var(arr, ddof): """Calculate unbiased variance of values""" @@ -308,8 +302,6 @@ def apply_minp(arr, ddof, minp): gen_hpat_pandas_series_rolling_impl(arr_skew)) hpat_pandas_rolling_series_std_impl = register_jitable( gen_hpat_pandas_series_rolling_ddof_impl(arr_std)) -hpat_pandas_rolling_series_sum_impl = register_jitable( - gen_hpat_pandas_series_rolling_impl(arr_sum)) hpat_pandas_rolling_series_var_impl = register_jitable( gen_hpat_pandas_series_rolling_ddof_impl(arr_var)) @@ -625,7 +617,55 @@ def hpat_pandas_series_rolling_sum(self): ty_checker = TypeChecker('Method rolling.sum().') ty_checker.check(self, SeriesRollingType) - return hpat_pandas_rolling_series_sum_impl + def _sdc_pandas_series_rolling_sum_impl(self): + win = self._window + minp = self._min_periods + + input_series = self._data + input_arr = input_series._data + length = len(input_arr) + output_arr = numpy.empty(length, dtype=float64) + + nfinite = 0 + current_result = 0. + boundary = min(win, length) + for i in range(boundary): + value = input_arr[i] + if numpy.isfinite(value): + nfinite += 1 + current_result += value + + if nfinite < minp: + output_arr[i] = numpy.nan + else: + output_arr[i] = current_result + + start_indices = range(length - boundary) + end_indices = range(boundary, length) + for start_idx, end_idx in zip(start_indices, end_indices): + if start_idx == end_idx: + # case when window == 0 + output_arr[end_idx] = current_result + continue + + first_val = input_arr[start_idx] + last_val = input_arr[end_idx] + + if numpy.isfinite(first_val): + nfinite -= 1 + current_result -= first_val + if numpy.isfinite(last_val): + nfinite += 1 + current_result += last_val + + if nfinite < minp: + output_arr[end_idx] = numpy.nan + else: + output_arr[end_idx] = current_result + + return pandas.Series(output_arr, input_series._index, name=input_series._name) + + return _sdc_pandas_series_rolling_sum_impl @sdc_rolling_overload(SeriesRollingType, 'var') diff --git a/sdc/tests/test_rolling.py b/sdc/tests/test_rolling.py index 9d3acd92e..73984c09b 100644 --- a/sdc/tests/test_rolling.py +++ b/sdc/tests/test_rolling.py @@ -847,8 +847,8 @@ def test_impl(obj, window, min_periods): hpat_func = self.jit(test_impl) assert_equal = self._get_assert_equal(obj) - for window in range(0, len(obj) + 3, 2): - for min_periods in range(0, window + 1, 2): + for window in range(len(obj) + 2): + for min_periods in range(window): with self.subTest(obj=obj, window=window, min_periods=min_periods): jit_result = hpat_func(obj, window, min_periods) From c22c208db2fbf401415c93265982af442db9dde2 Mon Sep 17 00:00:00 2001 From: Denis Date: Mon, 17 Feb 2020 19:05:39 +0300 Subject: [PATCH 02/12] Enable scalability for series.rolling.sum --- .../hpat_pandas_series_rolling_functions.py | 88 +++++++++++-------- .../tests_perf/test_perf_series_rolling.py | 2 +- sdc/utilities/prange_utils.py | 17 +--- 3 files changed, 56 insertions(+), 51 deletions(-) diff --git a/sdc/datatypes/hpat_pandas_series_rolling_functions.py b/sdc/datatypes/hpat_pandas_series_rolling_functions.py index c2793df57..1a133bb97 100644 --- a/sdc/datatypes/hpat_pandas_series_rolling_functions.py +++ b/sdc/datatypes/hpat_pandas_series_rolling_functions.py @@ -37,6 +37,7 @@ from sdc.datatypes.common_functions import _sdc_pandas_series_align from sdc.datatypes.hpat_pandas_series_rolling_types import SeriesRollingType from sdc.hiframes.pd_series_type import SeriesType +from sdc.utilities.prange_utils import get_chunks from sdc.utilities.sdc_typing_utils import TypeChecker from sdc.utilities.utils import sdc_overload_method, sdc_register_jitable @@ -626,42 +627,57 @@ def _sdc_pandas_series_rolling_sum_impl(self): length = len(input_arr) output_arr = numpy.empty(length, dtype=float64) - nfinite = 0 - current_result = 0. - boundary = min(win, length) - for i in range(boundary): - value = input_arr[i] - if numpy.isfinite(value): - nfinite += 1 - current_result += value - - if nfinite < minp: - output_arr[i] = numpy.nan - else: - output_arr[i] = current_result - - start_indices = range(length - boundary) - end_indices = range(boundary, length) - for start_idx, end_idx in zip(start_indices, end_indices): - if start_idx == end_idx: - # case when window == 0 - output_arr[end_idx] = current_result - continue - - first_val = input_arr[start_idx] - last_val = input_arr[end_idx] - - if numpy.isfinite(first_val): - nfinite -= 1 - current_result -= first_val - if numpy.isfinite(last_val): - nfinite += 1 - current_result += last_val - - if nfinite < minp: - output_arr[end_idx] = numpy.nan - else: - output_arr[end_idx] = current_result + chunks = get_chunks(length) + for i in prange(len(chunks)): + chunk = chunks[i] + + start = max(chunk.start - (win - 1), 0) + win_length = chunk.stop - start + + nfinite = 0 + current_result = 0. + boundary = start + min(win, win_length) + for idx in range(start, boundary): + value = input_arr[idx] + if numpy.isfinite(value): + nfinite += 1 + current_result += value + + if idx < chunk.start: + continue + + if nfinite < minp: + output_arr[idx] = numpy.nan + else: + output_arr[idx] = current_result + + start_indices = range(start, start + chunk.stop - boundary) + end_indices = range(boundary, chunk.stop) + for start_idx, end_idx in zip(start_indices, end_indices): + if start_idx == end_idx: + # case when window == 0 + if end_idx < chunk.start: + continue + output_arr[end_idx] = current_result + continue + + excluded_val = input_arr[start_idx] + included_val = input_arr[end_idx] + + if numpy.isfinite(excluded_val): + nfinite -= 1 + current_result -= excluded_val + if numpy.isfinite(included_val): + nfinite += 1 + current_result += included_val + + if end_idx < chunk.start: + continue + + if nfinite < minp: + output_arr[end_idx] = numpy.nan + else: + output_arr[end_idx] = current_result return pandas.Series(output_arr, input_series._index, name=input_series._name) diff --git a/sdc/tests/tests_perf/test_perf_series_rolling.py b/sdc/tests/tests_perf/test_perf_series_rolling.py index a6c988882..5eb5743bd 100644 --- a/sdc/tests/tests_perf/test_perf_series_rolling.py +++ b/sdc/tests/tests_perf/test_perf_series_rolling.py @@ -96,7 +96,7 @@ def _test_case(self, pyfunc, name, total_data_length, data_num=1, TC(name='quantile', size=[10 ** 7], params='0.2'), TC(name='skew', size=[10 ** 7]), TC(name='std', size=[10 ** 7]), - TC(name='sum', size=[10 ** 7]), + TC(name='sum', size=[4 * 10 ** 7]), TC(name='var', size=[10 ** 7]), ] diff --git a/sdc/utilities/prange_utils.py b/sdc/utilities/prange_utils.py index 2380bb513..49da35249 100644 --- a/sdc/utilities/prange_utils.py +++ b/sdc/utilities/prange_utils.py @@ -29,7 +29,7 @@ import sdc from typing import NamedTuple -from sdc.utilities.utils import sdc_overload +from sdc.utilities.utils import sdc_overload, sdc_register_jitable class Chunk(NamedTuple): @@ -54,6 +54,7 @@ def get_pool_size_impl(): return get_pool_size_impl +@sdc_register_jitable def get_chunks(size, pool_size=0): if pool_size == 0: pool_size = get_pool_size() @@ -72,18 +73,6 @@ def get_chunks(size, pool_size=0): @sdc_overload(get_chunks) def get_chunks_overload(size, pool_size=0): def get_chunks_impl(size, pool_size=0): - if pool_size == 0: - pool_size = get_pool_size() - - chunk_size = (size - 1) // pool_size + 1 - - chunks = [] - for i in range(pool_size): - start = min(i * chunk_size, size) - stop = min((i + 1) * chunk_size, size) - chunk = Chunk(start, stop) - chunks.append(chunk) - - return chunks + return get_chunks(size, pool_size=pool_size) return get_chunks_impl From 7c10fcaea0e041cb7e078caf7673213c3057d04a Mon Sep 17 00:00:00 2001 From: Denis Date: Tue, 18 Feb 2020 15:03:31 +0300 Subject: [PATCH 03/12] Move rolling part to separate place --- .../hpat_pandas_series_rolling_functions.py | 61 +++--------------- sdc/utilities/window_utils.py | 62 +++++++++++++++++++ 2 files changed, 72 insertions(+), 51 deletions(-) create mode 100644 sdc/utilities/window_utils.py diff --git a/sdc/datatypes/hpat_pandas_series_rolling_functions.py b/sdc/datatypes/hpat_pandas_series_rolling_functions.py index 1a133bb97..7eacd631c 100644 --- a/sdc/datatypes/hpat_pandas_series_rolling_functions.py +++ b/sdc/datatypes/hpat_pandas_series_rolling_functions.py @@ -29,7 +29,7 @@ from functools import partial -from numba import prange +from numba import objmode, prange from numba.extending import register_jitable from numba.types import (float64, Boolean, Integer, NoneType, Number, Omitted, StringLiteral, UnicodeType) @@ -40,6 +40,7 @@ from sdc.utilities.prange_utils import get_chunks from sdc.utilities.sdc_typing_utils import TypeChecker from sdc.utilities.utils import sdc_overload_method, sdc_register_jitable +from sdc.utilities.window_utils import WindowSum # disabling parallel execution for rolling due to numba issue https://github.com/numba/numba/issues/5098 @@ -630,56 +631,14 @@ def _sdc_pandas_series_rolling_sum_impl(self): chunks = get_chunks(length) for i in prange(len(chunks)): chunk = chunks[i] - - start = max(chunk.start - (win - 1), 0) - win_length = chunk.stop - start - - nfinite = 0 - current_result = 0. - boundary = start + min(win, win_length) - for idx in range(start, boundary): - value = input_arr[idx] - if numpy.isfinite(value): - nfinite += 1 - current_result += value - - if idx < chunk.start: - continue - - if nfinite < minp: - output_arr[idx] = numpy.nan - else: - output_arr[idx] = current_result - - start_indices = range(start, start + chunk.stop - boundary) - end_indices = range(boundary, chunk.stop) - for start_idx, end_idx in zip(start_indices, end_indices): - if start_idx == end_idx: - # case when window == 0 - if end_idx < chunk.start: - continue - output_arr[end_idx] = current_result - continue - - excluded_val = input_arr[start_idx] - included_val = input_arr[end_idx] - - if numpy.isfinite(excluded_val): - nfinite -= 1 - current_result -= excluded_val - if numpy.isfinite(included_val): - nfinite += 1 - current_result += included_val - - if end_idx < chunk.start: - continue - - if nfinite < minp: - output_arr[end_idx] = numpy.nan - else: - output_arr[end_idx] = current_result - - return pandas.Series(output_arr, input_series._index, name=input_series._name) + window = WindowSum(win, minp) + for idx in range(chunk.start, chunk.stop): + window.roll(input_arr, idx) + output_arr[idx] = window.result + window.free() + + return pandas.Series(output_arr, input_series._index, + name=input_series._name) return _sdc_pandas_series_rolling_sum_impl diff --git a/sdc/utilities/window_utils.py b/sdc/utilities/window_utils.py new file mode 100644 index 000000000..87c1718e0 --- /dev/null +++ b/sdc/utilities/window_utils.py @@ -0,0 +1,62 @@ +import numpy as np +from numba import jitclass, njit, types + + +spec = [ + ('size', types.int64), + ('minp', types.int64), + ('_nfinite', types.int64), + ('_nroll', types.int64), + ('_result', types.float64) +] + + +@jitclass(spec) +class WindowSum: + def __init__(self, size, minp): + self.size = size + self.minp = minp + + self._nfinite = 0 + self._nroll = 0 + self._result = 0. + + @property + def result(self): + """Get the latest result taking into account min periods.""" + if self._nfinite < self.minp: + return np.nan + + return self._result + + def roll(self, data, idx): + """Calculate the window sum.""" + if self._nroll >= self.size: + excluded_value = data[idx - self.size] + if np.isfinite(excluded_value): + self._nfinite -= 1 + self._result -= excluded_value + + value = data[idx] + if np.isfinite(value): + self._nfinite += 1 + self._result += value + + self._nroll += 1 + + def free(self): + """Free the window.""" + self._nfinite = 0 + self._nroll = 0 + self._result = 0. + + +if __name__ == '__main__': + @njit + def sum(): + win_sum = WindowSum(3, 2) + data = list(range(5)) # 0, 1, 2, 3, 4 + for i in data: + win_sum.roll(data, i) + print(win_sum.result) # nan, 1.0, 3.0, 6.0, 9.0 + sum() From be0229f86e786c1211ec423681a2843e99d10f60 Mon Sep 17 00:00:00 2001 From: Denis Date: Tue, 18 Feb 2020 15:05:23 +0300 Subject: [PATCH 04/12] Fix style issues --- sdc/utilities/window_utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdc/utilities/window_utils.py b/sdc/utilities/window_utils.py index 87c1718e0..95cfea88c 100644 --- a/sdc/utilities/window_utils.py +++ b/sdc/utilities/window_utils.py @@ -55,8 +55,8 @@ def free(self): @njit def sum(): win_sum = WindowSum(3, 2) - data = list(range(5)) # 0, 1, 2, 3, 4 + data = list(range(5)) # 0, 1, 2, 3, 4 for i in data: win_sum.roll(data, i) - print(win_sum.result) # nan, 1.0, 3.0, 6.0, 9.0 + print(win_sum.result) # nan, 1.0, 3.0, 6.0, 9.0 sum() From b18f5d3743b268ed69f9330fa27feadda98c22fb Mon Sep 17 00:00:00 2001 From: Denis Date: Tue, 18 Feb 2020 15:07:06 +0300 Subject: [PATCH 05/12] Minor fixes for series.rolling.sum() --- .../hpat_pandas_series_rolling_functions.py | 2 +- sdc/utilities/window_utils.py | 26 +++++++++++++++++++ 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/sdc/datatypes/hpat_pandas_series_rolling_functions.py b/sdc/datatypes/hpat_pandas_series_rolling_functions.py index 7eacd631c..b8f181572 100644 --- a/sdc/datatypes/hpat_pandas_series_rolling_functions.py +++ b/sdc/datatypes/hpat_pandas_series_rolling_functions.py @@ -29,7 +29,7 @@ from functools import partial -from numba import objmode, prange +from numba import prange from numba.extending import register_jitable from numba.types import (float64, Boolean, Integer, NoneType, Number, Omitted, StringLiteral, UnicodeType) diff --git a/sdc/utilities/window_utils.py b/sdc/utilities/window_utils.py index 95cfea88c..a5a6590c4 100644 --- a/sdc/utilities/window_utils.py +++ b/sdc/utilities/window_utils.py @@ -1,3 +1,29 @@ +# ***************************************************************************** +# Copyright (c) 2020, Intel Corporation All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# Redistributions of source code must retain the above copyright notice, +# this list of conditions and the following disclaimer. +# +# Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, +# THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; +# OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +# WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR +# OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, +# EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +# ***************************************************************************** + import numpy as np from numba import jitclass, njit, types From e0d92fdf1e241c33959b8a7a7b6621273316a2d2 Mon Sep 17 00:00:00 2001 From: Denis Date: Tue, 18 Feb 2020 15:27:15 +0300 Subject: [PATCH 06/12] Change perf test for series.rolling.sum() --- .../tests_perf/test_perf_series_rolling.py | 62 ++++++++++++++++--- 1 file changed, 54 insertions(+), 8 deletions(-) diff --git a/sdc/tests/tests_perf/test_perf_series_rolling.py b/sdc/tests/tests_perf/test_perf_series_rolling.py index 5eb5743bd..31261a7f9 100644 --- a/sdc/tests/tests_perf/test_perf_series_rolling.py +++ b/sdc/tests/tests_perf/test_perf_series_rolling.py @@ -24,21 +24,31 @@ # OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, # EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. # ***************************************************************************** -import string + import time -import numba import pandas import numpy as np from sdc.tests.test_utils import test_global_input_data_float64 from sdc.tests.tests_perf.test_perf_base import TestBase -from sdc.tests.tests_perf.test_perf_utils import (calc_compilation, get_times, - perf_data_gen_fixed_len) +from sdc.tests.tests_perf.test_perf_utils import perf_data_gen_fixed_len from .generator import generate_test_cases from .generator import TestCase as TC +rolling_usecase_tmpl = """ +def series_rolling_{method_name}_usecase(data, {extra_usecase_params}): + start_time = time.time() + results = [] + for i in range({ncalls}): + res = data.rolling({rolling_params}).{method_name}({method_params}) + results.append(res) + end_time = time.time() + return end_time - start_time, results +""" + + def get_rolling_params(window=100, min_periods=None): """Generate supported rolling parameters""" rolling_params = [f'{window}'] @@ -48,14 +58,37 @@ def get_rolling_params(window=100, min_periods=None): return ', '.join(rolling_params) +def gen_series_rolling_usecase(method_name, rolling_params=None, + extra_usecase_params='', + method_params='', ncalls=1): + """Generate series rolling method use case""" + if not rolling_params: + rolling_params = get_rolling_params() + + func_text = rolling_usecase_tmpl.format(**{ + 'method_name': method_name, + 'extra_usecase_params': extra_usecase_params, + 'rolling_params': rolling_params, + 'method_params': method_params, + 'ncalls': ncalls + }) + + global_vars = {'np': np, 'time': time} + loc_vars = {} + exec(func_text, global_vars, loc_vars) + _series_rolling_usecase = loc_vars[f'series_rolling_{method_name}_usecase'] + + return _series_rolling_usecase + + # python -m sdc.runtests sdc.tests.tests_perf.test_perf_series_rolling.TestSeriesRollingMethods class TestSeriesRollingMethods(TestBase): - # more than 19 columns raise SystemError: CPUDispatcher() returned a result with an error set - max_columns_num = 19 - @classmethod def setUpClass(cls): super().setUpClass() + cls.map_ncalls_dlength = { + 'sum': (100, [10 ** 5]), + } def _test_case(self, pyfunc, name, total_data_length, data_num=1, input_data=test_global_input_data_float64): @@ -82,6 +115,20 @@ def _test_case(self, pyfunc, name, total_data_length, data_num=1, self._test_jit(pyfunc, base, *args) self._test_py(pyfunc, base, *args) + def _test_series_rolling_method(self, name, rolling_params=None, + extra_usecase_params='', method_params=''): + ncalls, total_data_length = self.map_ncalls_dlength[name] + usecase = gen_series_rolling_usecase(name, rolling_params=rolling_params, + extra_usecase_params=extra_usecase_params, + method_params=method_params, ncalls=ncalls) + data_num = 1 + if extra_usecase_params: + data_num += len(extra_usecase_params.split(', ')) + self._test_case(usecase, name, total_data_length, data_num=data_num) + + def test_series_rolling_sum(self): + self._test_series_rolling_method('sum') + cases = [ TC(name='apply', size=[10 ** 7], params='func=lambda x: np.nan if len(x) == 0 else x.mean()'), @@ -96,7 +143,6 @@ def _test_case(self, pyfunc, name, total_data_length, data_num=1, TC(name='quantile', size=[10 ** 7], params='0.2'), TC(name='skew', size=[10 ** 7]), TC(name='std', size=[10 ** 7]), - TC(name='sum', size=[4 * 10 ** 7]), TC(name='var', size=[10 ** 7]), ] From 0e63d1a351158fcc9d769b4aa1e461f3614b3343 Mon Sep 17 00:00:00 2001 From: Denis Date: Tue, 18 Feb 2020 18:27:27 +0300 Subject: [PATCH 07/12] Fix issue in case of multithreading --- sdc/utilities/window_utils.py | 30 +++++++++++++++++++++++------- 1 file changed, 23 insertions(+), 7 deletions(-) diff --git a/sdc/utilities/window_utils.py b/sdc/utilities/window_utils.py index a5a6590c4..5b3bf1371 100644 --- a/sdc/utilities/window_utils.py +++ b/sdc/utilities/window_utils.py @@ -55,18 +55,34 @@ def result(self): return self._result + def _include(self, value): + """Calculate the window sum with new value.""" + if np.isfinite(value): + self._nfinite += 1 + self._result += value + + def _exclude(self, value): + """Calculate the window sum without old value.""" + if np.isfinite(value): + self._nfinite -= 1 + self._result -= value + def roll(self, data, idx): """Calculate the window sum.""" + if self._nroll == 0: + start = max(idx + 1 - self.size, 0) + for i in range(start, idx): + value = data[i] + self._include(value) + + self._nroll += 1 + if self._nroll >= self.size: - excluded_value = data[idx - self.size] - if np.isfinite(excluded_value): - self._nfinite -= 1 - self._result -= excluded_value + value = data[idx - self.size] + self._exclude(value) value = data[idx] - if np.isfinite(value): - self._nfinite += 1 - self._result += value + self._include(value) self._nroll += 1 From a8572f839f66b200264aeeae216f50558bb8aed6 Mon Sep 17 00:00:00 2001 From: Denis Date: Wed, 19 Feb 2020 09:30:18 +0300 Subject: [PATCH 08/12] Minor changes in WindowSum --- sdc/utilities/window_utils.py | 29 +++++++++++++++-------------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/sdc/utilities/window_utils.py b/sdc/utilities/window_utils.py index 5b3bf1371..a4dbc65de 100644 --- a/sdc/utilities/window_utils.py +++ b/sdc/utilities/window_utils.py @@ -55,37 +55,38 @@ def result(self): return self._result - def _include(self, value): + def _put(self, value): """Calculate the window sum with new value.""" if np.isfinite(value): self._nfinite += 1 self._result += value - def _exclude(self, value): + def _pop(self, value): """Calculate the window sum without old value.""" if np.isfinite(value): self._nfinite -= 1 self._result -= value - def roll(self, data, idx): - """Calculate the window sum.""" - if self._nroll == 0: - start = max(idx + 1 - self.size, 0) - for i in range(start, idx): - value = data[i] - self._include(value) - - self._nroll += 1 - + def _roll(self, data, idx): + """Calculate the window sum based on the previous result.""" if self._nroll >= self.size: value = data[idx - self.size] - self._exclude(value) + self._pop(value) value = data[idx] - self._include(value) + self._put(value) self._nroll += 1 + def roll(self, data, idx): + """Calculate the window sum.""" + if self._nroll == 0: + start = max(idx + 1 - self.size, 0) + for i in range(start, idx): + self._roll(data, i) + + self._roll(data, idx) + def free(self): """Free the window.""" self._nfinite = 0 From 713f6235a54a9535f93485c4c464f3df3095fef5 Mon Sep 17 00:00:00 2001 From: Denis Date: Wed, 19 Feb 2020 13:38:33 +0300 Subject: [PATCH 09/12] Enable scalability for series.rolling.sum() --- .../hpat_pandas_series_rolling_functions.py | 54 +++++++++++-------- sdc/utilities/window_utils.py | 42 +++++++-------- 2 files changed, 50 insertions(+), 46 deletions(-) diff --git a/sdc/datatypes/hpat_pandas_series_rolling_functions.py b/sdc/datatypes/hpat_pandas_series_rolling_functions.py index 20d1319bb..fef3ab448 100644 --- a/sdc/datatypes/hpat_pandas_series_rolling_functions.py +++ b/sdc/datatypes/hpat_pandas_series_rolling_functions.py @@ -308,6 +308,35 @@ def apply_minp(arr, ddof, minp): gen_hpat_pandas_series_rolling_ddof_impl(arr_var)) +def gen_sdc_pandas_series_rolling_impl(window_cls): + """Generate series rolling methods implementations based on window class""" + def impl(self): + win = self._window + minp = self._min_periods + + input_series = self._data + input_arr = input_series._data + length = len(input_arr) + output_arr = numpy.empty(length, dtype=float64) + + chunks = parallel_chunks(length) + for i in prange(len(chunks)): + chunk = chunks[i] + window = window_cls(win, minp) + for idx in range(chunk.start, chunk.stop): + window.roll(input_arr, idx) + output_arr[idx] = window.result + window.free() + + return pandas.Series(output_arr, input_series._index, + name=input_series._name) + return impl + + +sdc_pandas_rolling_series_sum_impl = register_jitable( + gen_sdc_pandas_series_rolling_impl(WindowSum)) + + @sdc_rolling_overload(SeriesRollingType, 'apply') def hpat_pandas_series_rolling_apply(self, func, raw=None): @@ -613,34 +642,13 @@ def hpat_pandas_series_rolling_std(self, ddof=1): return hpat_pandas_rolling_series_std_impl -@sdc_rolling_overload(SeriesRollingType, 'sum') +@sdc_overload_method(SeriesRollingType, 'sum') def hpat_pandas_series_rolling_sum(self): ty_checker = TypeChecker('Method rolling.sum().') ty_checker.check(self, SeriesRollingType) - def _sdc_pandas_series_rolling_sum_impl(self): - win = self._window - minp = self._min_periods - - input_series = self._data - input_arr = input_series._data - length = len(input_arr) - output_arr = numpy.empty(length, dtype=float64) - - chunks = parallel_chunks(length) - for i in prange(len(chunks)): - chunk = chunks[i] - window = WindowSum(win, minp) - for idx in range(chunk.start, chunk.stop): - window.roll(input_arr, idx) - output_arr[idx] = window.result - window.free() - - return pandas.Series(output_arr, input_series._index, - name=input_series._name) - - return _sdc_pandas_series_rolling_sum_impl + return sdc_pandas_rolling_series_sum_impl @sdc_rolling_overload(SeriesRollingType, 'var') diff --git a/sdc/utilities/window_utils.py b/sdc/utilities/window_utils.py index a4dbc65de..e8b33fe63 100644 --- a/sdc/utilities/window_utils.py +++ b/sdc/utilities/window_utils.py @@ -55,38 +55,34 @@ def result(self): return self._result - def _put(self, value): - """Calculate the window sum with new value.""" - if np.isfinite(value): - self._nfinite += 1 - self._result += value + def roll(self, data, idx): + """Calculate the window sum.""" + if self._nroll == 0: + start = max(idx + 1 - self.size, 0) + for i in range(start, idx): + value = data[idx] + # calculate the window sum with new value + if np.isfinite(value): + self._nfinite += 1 + self._result += value - def _pop(self, value): - """Calculate the window sum without old value.""" - if np.isfinite(value): - self._nfinite -= 1 - self._result -= value + self._nroll += 1 - def _roll(self, data, idx): - """Calculate the window sum based on the previous result.""" if self._nroll >= self.size: + # calculate the window sum without old value. value = data[idx - self.size] - self._pop(value) + if np.isfinite(value): + self._nfinite -= 1 + self._result -= value value = data[idx] - self._put(value) + # calculate the window sum with new value + if np.isfinite(value): + self._nfinite += 1 + self._result += value self._nroll += 1 - def roll(self, data, idx): - """Calculate the window sum.""" - if self._nroll == 0: - start = max(idx + 1 - self.size, 0) - for i in range(start, idx): - self._roll(data, i) - - self._roll(data, idx) - def free(self): """Free the window.""" self._nfinite = 0 From 5be165c6c047a34f2ca6a2d5c2116173d60d63b8 Mon Sep 17 00:00:00 2001 From: Denis Date: Wed, 19 Feb 2020 13:49:02 +0300 Subject: [PATCH 10/12] Change perf test on series.rolling.sum --- sdc/tests/tests_perf/test_perf_series_rolling.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/sdc/tests/tests_perf/test_perf_series_rolling.py b/sdc/tests/tests_perf/test_perf_series_rolling.py index 31261a7f9..8db933af5 100644 --- a/sdc/tests/tests_perf/test_perf_series_rolling.py +++ b/sdc/tests/tests_perf/test_perf_series_rolling.py @@ -40,12 +40,10 @@ rolling_usecase_tmpl = """ def series_rolling_{method_name}_usecase(data, {extra_usecase_params}): start_time = time.time() - results = [] for i in range({ncalls}): res = data.rolling({rolling_params}).{method_name}({method_params}) - results.append(res) end_time = time.time() - return end_time - start_time, results + return end_time - start_time, res """ @@ -87,7 +85,7 @@ class TestSeriesRollingMethods(TestBase): def setUpClass(cls): super().setUpClass() cls.map_ncalls_dlength = { - 'sum': (100, [10 ** 5]), + 'sum': (100, [2 * 10 ** 5]), } def _test_case(self, pyfunc, name, total_data_length, data_num=1, From ac675e59401796a93df283e390799b58ab2152d4 Mon Sep 17 00:00:00 2001 From: Denis Date: Wed, 19 Feb 2020 19:34:48 +0300 Subject: [PATCH 11/12] Refuse class WindowSum --- .../hpat_pandas_series_rolling_functions.py | 57 ++++++++-- .../tests_perf/test_perf_series_rolling.py | 2 +- sdc/utilities/window_utils.py | 101 ------------------ 3 files changed, 48 insertions(+), 112 deletions(-) delete mode 100644 sdc/utilities/window_utils.py diff --git a/sdc/datatypes/hpat_pandas_series_rolling_functions.py b/sdc/datatypes/hpat_pandas_series_rolling_functions.py index fef3ab448..b04867043 100644 --- a/sdc/datatypes/hpat_pandas_series_rolling_functions.py +++ b/sdc/datatypes/hpat_pandas_series_rolling_functions.py @@ -40,7 +40,6 @@ from sdc.utilities.prange_utils import parallel_chunks from sdc.utilities.sdc_typing_utils import TypeChecker from sdc.utilities.utils import sdc_overload_method, sdc_register_jitable -from sdc.utilities.window_utils import WindowSum # disabling parallel execution for rolling due to numba issue https://github.com/numba/numba/issues/5098 @@ -308,8 +307,28 @@ def apply_minp(arr, ddof, minp): gen_hpat_pandas_series_rolling_ddof_impl(arr_var)) -def gen_sdc_pandas_series_rolling_impl(window_cls): - """Generate series rolling methods implementations based on window class""" +@sdc_register_jitable +def pop_sum(value, nfinite, result): + """Calculate the window sum without old value.""" + if numpy.isfinite(value): + nfinite -= 1 + result -= value + + return nfinite, result + + +@sdc_register_jitable +def put_sum(value, nfinite, result): + """Calculate the window sum with new value.""" + if numpy.isfinite(value): + nfinite += 1 + result += value + + return nfinite, result + + +def gen_sdc_pandas_series_rolling_impl(pop, put, init_result=numpy.nan): + """Generate series rolling methods implementations based on pop/put funcs""" def impl(self): win = self._window minp = self._min_periods @@ -322,19 +341,37 @@ def impl(self): chunks = parallel_chunks(length) for i in prange(len(chunks)): chunk = chunks[i] - window = window_cls(win, minp) + nroll = 0 + nfinite = 0 + result = init_result for idx in range(chunk.start, chunk.stop): - window.roll(input_arr, idx) - output_arr[idx] = window.result - window.free() + if nroll == 0: + start = max(idx + 1 - win, 0) + for j in range(start, idx): + value = input_arr[j] + nfinite, result = put(value, nfinite, result) + nroll += 1 + + if nroll >= win: + value = input_arr[idx - win] + nfinite, result = pop(value, nfinite, result) + + value = input_arr[idx] + nfinite, result = put(value, nfinite, result) + nroll += 1 + + if nfinite < minp: + output_arr[idx] = numpy.nan + else: + output_arr[idx] = result return pandas.Series(output_arr, input_series._index, name=input_series._name) return impl -sdc_pandas_rolling_series_sum_impl = register_jitable( - gen_sdc_pandas_series_rolling_impl(WindowSum)) +sdc_pandas_series_rolling_sum_impl = register_jitable( + gen_sdc_pandas_series_rolling_impl(pop_sum, put_sum, init_result=0.)) @sdc_rolling_overload(SeriesRollingType, 'apply') @@ -648,7 +685,7 @@ def hpat_pandas_series_rolling_sum(self): ty_checker = TypeChecker('Method rolling.sum().') ty_checker.check(self, SeriesRollingType) - return sdc_pandas_rolling_series_sum_impl + return sdc_pandas_series_rolling_sum_impl @sdc_rolling_overload(SeriesRollingType, 'var') diff --git a/sdc/tests/tests_perf/test_perf_series_rolling.py b/sdc/tests/tests_perf/test_perf_series_rolling.py index 8db933af5..a38bbe1a9 100644 --- a/sdc/tests/tests_perf/test_perf_series_rolling.py +++ b/sdc/tests/tests_perf/test_perf_series_rolling.py @@ -85,7 +85,7 @@ class TestSeriesRollingMethods(TestBase): def setUpClass(cls): super().setUpClass() cls.map_ncalls_dlength = { - 'sum': (100, [2 * 10 ** 5]), + 'sum': (100, [8 * 10 ** 5]), } def _test_case(self, pyfunc, name, total_data_length, data_num=1, diff --git a/sdc/utilities/window_utils.py b/sdc/utilities/window_utils.py deleted file mode 100644 index e8b33fe63..000000000 --- a/sdc/utilities/window_utils.py +++ /dev/null @@ -1,101 +0,0 @@ -# ***************************************************************************** -# Copyright (c) 2020, Intel Corporation All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are met: -# -# Redistributions of source code must retain the above copyright notice, -# this list of conditions and the following disclaimer. -# -# Redistributions in binary form must reproduce the above copyright notice, -# this list of conditions and the following disclaimer in the documentation -# and/or other materials provided with the distribution. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" -# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, -# THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR -# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR -# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, -# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, -# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; -# OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, -# WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR -# OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, -# EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -# ***************************************************************************** - -import numpy as np -from numba import jitclass, njit, types - - -spec = [ - ('size', types.int64), - ('minp', types.int64), - ('_nfinite', types.int64), - ('_nroll', types.int64), - ('_result', types.float64) -] - - -@jitclass(spec) -class WindowSum: - def __init__(self, size, minp): - self.size = size - self.minp = minp - - self._nfinite = 0 - self._nroll = 0 - self._result = 0. - - @property - def result(self): - """Get the latest result taking into account min periods.""" - if self._nfinite < self.minp: - return np.nan - - return self._result - - def roll(self, data, idx): - """Calculate the window sum.""" - if self._nroll == 0: - start = max(idx + 1 - self.size, 0) - for i in range(start, idx): - value = data[idx] - # calculate the window sum with new value - if np.isfinite(value): - self._nfinite += 1 - self._result += value - - self._nroll += 1 - - if self._nroll >= self.size: - # calculate the window sum without old value. - value = data[idx - self.size] - if np.isfinite(value): - self._nfinite -= 1 - self._result -= value - - value = data[idx] - # calculate the window sum with new value - if np.isfinite(value): - self._nfinite += 1 - self._result += value - - self._nroll += 1 - - def free(self): - """Free the window.""" - self._nfinite = 0 - self._nroll = 0 - self._result = 0. - - -if __name__ == '__main__': - @njit - def sum(): - win_sum = WindowSum(3, 2) - data = list(range(5)) # 0, 1, 2, 3, 4 - for i in data: - win_sum.roll(data, i) - print(win_sum.result) # nan, 1.0, 3.0, 6.0, 9.0 - sum() From 86f3d5aceede719cbd5d38e09881a119ec79ef65 Mon Sep 17 00:00:00 2001 From: Denis Date: Thu, 20 Feb 2020 08:34:55 +0300 Subject: [PATCH 12/12] Refactor series.rolling.sum() --- .../hpat_pandas_series_rolling_functions.py | 44 ++++++++++++------- 1 file changed, 27 insertions(+), 17 deletions(-) diff --git a/sdc/datatypes/hpat_pandas_series_rolling_functions.py b/sdc/datatypes/hpat_pandas_series_rolling_functions.py index b04867043..e14d89f02 100644 --- a/sdc/datatypes/hpat_pandas_series_rolling_functions.py +++ b/sdc/datatypes/hpat_pandas_series_rolling_functions.py @@ -327,6 +327,15 @@ def put_sum(value, nfinite, result): return nfinite, result +@sdc_register_jitable +def result_or_nan(nfinite, minp, result): + """Get result taking into account min periods.""" + if nfinite < minp: + return numpy.nan + + return result + + def gen_sdc_pandas_series_rolling_impl(pop, put, init_result=numpy.nan): """Generate series rolling methods implementations based on pop/put funcs""" def impl(self): @@ -341,29 +350,30 @@ def impl(self): chunks = parallel_chunks(length) for i in prange(len(chunks)): chunk = chunks[i] - nroll = 0 nfinite = 0 result = init_result - for idx in range(chunk.start, chunk.stop): - if nroll == 0: - start = max(idx + 1 - win, 0) - for j in range(start, idx): - value = input_arr[j] - nfinite, result = put(value, nfinite, result) - nroll += 1 - - if nroll >= win: - value = input_arr[idx - win] - nfinite, result = pop(value, nfinite, result) + prelude_start = max(0, chunk.start - win + 1) + prelude_stop = min(chunk.start, prelude_start + win) + + interlude_start = prelude_stop + interlude_stop = min(prelude_start + win, chunk.stop) + + for idx in range(prelude_start, prelude_stop): value = input_arr[idx] nfinite, result = put(value, nfinite, result) - nroll += 1 - if nfinite < minp: - output_arr[idx] = numpy.nan - else: - output_arr[idx] = result + for idx in range(interlude_start, interlude_stop): + value = input_arr[idx] + nfinite, result = put(value, nfinite, result) + output_arr[idx] = result_or_nan(nfinite, minp, result) + + for idx in range(interlude_stop, chunk.stop): + put_value = input_arr[idx] + pop_value = input_arr[idx - win] + nfinite, result = put(put_value, nfinite, result) + nfinite, result = pop(pop_value, nfinite, result) + output_arr[idx] = result_or_nan(nfinite, minp, result) return pandas.Series(output_arr, input_series._index, name=input_series._name)