From fbdb56cda18634fce0882e043ac26acd3d2e17c6 Mon Sep 17 00:00:00 2001 From: Josh Rosenberg Date: Fri, 17 Mar 2017 22:04:33 -0400 Subject: [PATCH 01/11] bpo-29842: Make Executor.map less eager so it handles large/unbounded input iterables appropriately --- Doc/library/concurrent.futures.rst | 9 ++++++-- Lib/concurrent/futures/_base.py | 33 ++++++++++++++++++++++++----- Lib/concurrent/futures/process.py | 4 ++-- Lib/test/test_concurrent_futures.py | 17 ++++++++++++++- Misc/NEWS | 6 ++++++ 5 files changed, 59 insertions(+), 10 deletions(-) diff --git a/Doc/library/concurrent.futures.rst b/Doc/library/concurrent.futures.rst index d85576b8bedd8e..42b433ca07a1ac 100644 --- a/Doc/library/concurrent.futures.rst +++ b/Doc/library/concurrent.futures.rst @@ -38,7 +38,7 @@ Executor Objects future = executor.submit(pow, 323, 1235) print(future.result()) - .. method:: map(func, *iterables, timeout=None, chunksize=1) + .. method:: map(func, *iterables, timeout=None, chunksize=1, prefetch=None) Equivalent to :func:`map(func, *iterables) ` except *func* is executed asynchronously and several calls to *func* may be made concurrently. The @@ -54,11 +54,16 @@ Executor Objects specified by setting *chunksize* to a positive integer. For very long iterables, using a large value for *chunksize* can significantly improve performance compared to the default size of 1. With :class:`ThreadPoolExecutor`, - *chunksize* has no effect. + *chunksize* has no effect. By default, a reasonable number of tasks are + queued beyond the number of workers, an explicit *prefetch* count may be + provided to specify how many extra tasks should be queued. .. versionchanged:: 3.5 Added the *chunksize* argument. + .. versionchanged:: 3.7 + Added the *prefetch* argument. + .. method:: shutdown(wait=True) Signal the executor that it should free any resources that it is using diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py index 295489c93e56d8..57782fdba87a0c 100644 --- a/Lib/concurrent/futures/_base.py +++ b/Lib/concurrent/futures/_base.py @@ -4,6 +4,7 @@ __author__ = 'Brian Quinlan (brian@sweetapp.com)' import collections +import itertools import logging import threading import time @@ -520,7 +521,7 @@ def submit(self, fn, *args, **kwargs): """ raise NotImplementedError() - def map(self, fn, *iterables, timeout=None, chunksize=1): + def map(self, fn, *iterables, timeout=None, chunksize=1, prefetch=None): """Returns an iterator equivalent to map(fn, iter). Args: @@ -544,18 +545,40 @@ def map(self, fn, *iterables, timeout=None, chunksize=1): """ if timeout is not None: end_time = timeout + time.time() + if prefetch is None: + prefetch = self._max_workers + if prefetch < 0: + raise ValueError("prefetch count may not be negative") - fs = [self.submit(fn, *args) for args in zip(*iterables)] + argsiter = zip(*iterables) + + fs = collections.deque(self.submit(fn, *args) for args in itertools.islice(argsiter, self._max_workers + prefetch)) # Yield must be hidden in closure so that the futures are submitted # before the first iterator value is required. def result_iterator(): + nonlocal argsiter try: - for future in fs: + while fs: if timeout is None: - yield future.result() + res = fs[0].result() else: - yield future.result(end_time - time.time()) + res = fs[0].result(end_time - time.time()) + + # Got a result, future needn't be cancelled + del fs[0] + + # Dispatch next task before yielding to keep + # pipeline full + if argsiter: + try: + args = next(argsiter) + except StopIteration: + argsiter = None + else: + fs.append(self.submit(fn, *args)) + + yield res finally: for future in fs: future.cancel() diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 8f1d714193ab79..f8b02cb5cb880f 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -455,7 +455,7 @@ def submit(self, fn, *args, **kwargs): return f submit.__doc__ = _base.Executor.submit.__doc__ - def map(self, fn, *iterables, timeout=None, chunksize=1): + def map(self, fn, *iterables, timeout=None, chunksize=1, prefetch=None): """Returns an iterator equivalent to map(fn, iter). Args: @@ -481,7 +481,7 @@ def map(self, fn, *iterables, timeout=None, chunksize=1): results = super().map(partial(_process_chunk, fn), _get_chunks(*iterables, chunksize=chunksize), - timeout=timeout) + timeout=timeout, prefetch=prefetch) return itertools.chain.from_iterable(results) def shutdown(self, wait=True): diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index 23e95b212447c8..464bc858b080b8 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -467,7 +467,22 @@ def record_finished(n): self.executor.map(record_finished, range(10)) self.executor.shutdown(wait=True) - self.assertCountEqual(finished, range(10)) + # No guarantees on how many tasks dispatched, + # but at least one should have been dispatched + self.assertGreater(len(finished), 0) + + def test_infinite_map_input_completes_work(self): + import itertools + def identity(x): + return x + + mapobj = self.executor.map(identity, itertools.count(0)) + # Get one result, which shows we handle infinite inputs + # without waiting for all work to be dispatched + res = next(mapobj) + mapobj.close() # Make sure futures cancelled + + self.assertEqual(res, 0) def test_default_workers(self): executor = self.executor_type() diff --git a/Misc/NEWS b/Misc/NEWS index b7990c62e4f744..4ffeb0f391b6c0 100644 --- a/Misc/NEWS +++ b/Misc/NEWS @@ -281,6 +281,12 @@ Extension Modules Library ------- +- bpo-29842: Executor.map no longer creates all futures eagerly prior to + yielding any results. This allows it to work with huge or infinite iterables + without consuming excessive resources or crashing, making it more suitable + as a drop in replacement for the built-in map. + Patch by Josh Rosenberg. + - bpo-29800: Fix crashes in partial.__repr__ if the keys of partial.keywords are not strings. Patch by Michael Seifert. From 8394e3480eb9fbb5200836174024d75581a03b22 Mon Sep 17 00:00:00 2001 From: Josh Rosenberg Date: Mon, 6 May 2019 12:22:17 -0400 Subject: [PATCH 02/11] Add prefetch info to docstrings --- Lib/concurrent/futures/_base.py | 2 ++ Lib/concurrent/futures/process.py | 2 ++ 2 files changed, 4 insertions(+) diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py index b9888ce0d7083e..ff17bb7539b7b1 100644 --- a/Lib/concurrent/futures/_base.py +++ b/Lib/concurrent/futures/_base.py @@ -581,6 +581,8 @@ def map(self, fn, *iterables, timeout=None, chunksize=1, prefetch=None): before being passed to a child process. This argument is only used by ProcessPoolExecutor; it is ignored by ThreadPoolExecutor. + prefetch: The number of chunks to queue beyond the number of + workers on the executor. If None, a reasonable default is used. Returns: An iterator equivalent to: map(func, *iterables) but the calls may diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 1280f900c45d99..f5f633bee1f19c 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -643,6 +643,8 @@ def map(self, fn, *iterables, timeout=None, chunksize=1, prefetch=None): chunksize: If greater than one, the iterables will be chopped into chunks of size chunksize and submitted to the process pool. If set to one, the items in the list will be sent one at a time. + prefetch: The number of chunks to queue beyond the number of + workers on the executor. If None, a reasonable default is used. Returns: An iterator equivalent to: map(func, *iterables) but the calls may From 634a3de6838a14017d4bddbf482d112a8719d7d1 Mon Sep 17 00:00:00 2001 From: Josh Rosenberg Date: Mon, 6 May 2019 12:49:44 -0400 Subject: [PATCH 03/11] Add Misc/NEWS entry --- .../next/Library/2019-05-06-12-49-21.bpo-29842.Tw-ETh.rst | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 Misc/NEWS.d/next/Library/2019-05-06-12-49-21.bpo-29842.Tw-ETh.rst diff --git a/Misc/NEWS.d/next/Library/2019-05-06-12-49-21.bpo-29842.Tw-ETh.rst b/Misc/NEWS.d/next/Library/2019-05-06-12-49-21.bpo-29842.Tw-ETh.rst new file mode 100644 index 00000000000000..9318ed1f8e9df7 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2019-05-06-12-49-21.bpo-29842.Tw-ETh.rst @@ -0,0 +1,4 @@ +Executor.map no longer creates all futures eagerly prior to yielding any +results. This allows it to work with huge or infinite iterables without +consuming excessive resources or crashing, making it more suitable as a drop +in replacement for the built-in map. Patch by Josh Rosenberg. From 015283128476101da1f66067d4fda22b90cc70ff Mon Sep 17 00:00:00 2001 From: Josh Rosenberg Date: Mon, 6 May 2019 15:18:12 -0400 Subject: [PATCH 04/11] Remove trailing whitespace in docs --- Doc/library/concurrent.futures.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Doc/library/concurrent.futures.rst b/Doc/library/concurrent.futures.rst index 02474cbf7f67d7..8e57dc1550e8b2 100644 --- a/Doc/library/concurrent.futures.rst +++ b/Doc/library/concurrent.futures.rst @@ -63,7 +63,7 @@ Executor Objects using a large value for *chunksize* can significantly improve performance compared to the default size of 1. With :class:`ThreadPoolExecutor`, *chunksize* has no effect. - + By default, a reasonable number of tasks are queued beyond the number of workers, an explicit *prefetch* count may be provided to specify how many extra tasks should be queued. From 3c38fabdfe9d625064625b4fb84aa41d59bf250d Mon Sep 17 00:00:00 2001 From: Josh Rosenberg Date: Mon, 6 May 2019 16:06:19 -0400 Subject: [PATCH 05/11] Fix behavior to follow test_free_reference requirements (generator holds no reference to result at the moment it yields Reduce line lengths to PEP8 limits --- Lib/concurrent/futures/_base.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py index ff17bb7539b7b1..efb6ce2b45c8d3 100644 --- a/Lib/concurrent/futures/_base.py +++ b/Lib/concurrent/futures/_base.py @@ -601,8 +601,9 @@ def map(self, fn, *iterables, timeout=None, chunksize=1, prefetch=None): raise ValueError("prefetch count may not be negative") argsiter = zip(*iterables) + initialargs = itertools.islice(argsiter, self._max_workers + prefetch) - fs = collections.deque(self.submit(fn, *args) for args in itertools.islice(argsiter, self._max_workers + prefetch)) + fs = collections.deque(self.submit(fn, *args) for args in initialargs) # Yield must be hidden in closure so that the futures are submitted # before the first iterator value is required. @@ -611,9 +612,9 @@ def result_iterator(): try: while fs: if timeout is None: - res = fs[0].result() + res = [fs[0].result()] else: - res = fs[0].result(end_time - time.monotonic()) + res = [fs[0].result(end_time - time.monotonic())] # Got a result, future needn't be cancelled del fs[0] @@ -628,7 +629,7 @@ def result_iterator(): else: fs.append(self.submit(fn, *args)) - yield res + yield res.pop() finally: for future in fs: future.cancel() From f3a3791ec5903b029ced41b206aa0738b5103ff1 Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Thu, 20 Feb 2020 10:17:46 +0000 Subject: [PATCH 06/11] updated concurrent.futures:ThreadPoolExecutor.map prefetch versionadded --- Doc/library/concurrent.futures.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Doc/library/concurrent.futures.rst b/Doc/library/concurrent.futures.rst index 8ac1f20edd8934..022d6cf9e56bf6 100644 --- a/Doc/library/concurrent.futures.rst +++ b/Doc/library/concurrent.futures.rst @@ -71,7 +71,7 @@ Executor Objects .. versionchanged:: 3.5 Added the *chunksize* argument. - .. versionchanged:: 3.8 + .. versionchanged:: 3.9 Added the *prefetch* argument. .. method:: shutdown(wait=True, \*, cancel_futures=False) From 9234a48857dd31483274cae29133d0b419ce58a2 Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Wed, 21 Jul 2021 08:13:50 +0100 Subject: [PATCH 07/11] Update Doc/library/concurrent.futures.rst --- Doc/library/concurrent.futures.rst | 3 +++ 1 file changed, 3 insertions(+) diff --git a/Doc/library/concurrent.futures.rst b/Doc/library/concurrent.futures.rst index e1b0ecb7cef622..d8961583374830 100644 --- a/Doc/library/concurrent.futures.rst +++ b/Doc/library/concurrent.futures.rst @@ -70,6 +70,9 @@ Executor Objects .. versionchanged:: 3.5 Added the *chunksize* argument. + + .. versionchanged:: 3.11 + Added the *prefetch* argument. .. method:: shutdown(wait=True, *, cancel_futures=False) From b618b7f15d7eac5ca02241ea5fb22b2d061c607d Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Mon, 18 Jul 2022 16:30:45 +0100 Subject: [PATCH 08/11] Update Doc/library/concurrent.futures.rst --- Doc/library/concurrent.futures.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Doc/library/concurrent.futures.rst b/Doc/library/concurrent.futures.rst index bf7632fa293658..4f33931bea2a3d 100644 --- a/Doc/library/concurrent.futures.rst +++ b/Doc/library/concurrent.futures.rst @@ -71,7 +71,7 @@ Executor Objects .. versionchanged:: 3.5 Added the *chunksize* argument. - .. versionchanged:: 3.11 + .. versionchanged:: 3.12 Added the *prefetch* argument. .. method:: shutdown(wait=True, *, cancel_futures=False) From bf36a8d59eb5d3da8b4e00519255e5f22e70f7bd Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Sat, 23 Jul 2022 10:13:05 +0100 Subject: [PATCH 09/11] Update Misc/NEWS.d/next/Library/2019-05-06-12-49-21.bpo-29842.Tw-ETh.rst Co-authored-by: Kumar Aditya <59607654+kumaraditya303@users.noreply.github.com> --- .../next/Library/2019-05-06-12-49-21.bpo-29842.Tw-ETh.rst | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Misc/NEWS.d/next/Library/2019-05-06-12-49-21.bpo-29842.Tw-ETh.rst b/Misc/NEWS.d/next/Library/2019-05-06-12-49-21.bpo-29842.Tw-ETh.rst index 9318ed1f8e9df7..4e42fe29aa856a 100644 --- a/Misc/NEWS.d/next/Library/2019-05-06-12-49-21.bpo-29842.Tw-ETh.rst +++ b/Misc/NEWS.d/next/Library/2019-05-06-12-49-21.bpo-29842.Tw-ETh.rst @@ -1,4 +1,4 @@ -Executor.map no longer creates all futures eagerly prior to yielding any -results. This allows it to work with huge or infinite iterables without +:meth:`concurrent.futures.Executor.map` no longer eagerly creates all futures prior to yielding any +results. This allows it to work with huge or infinite :term:`iterable` without consuming excessive resources or crashing, making it more suitable as a drop -in replacement for the built-in map. Patch by Josh Rosenberg. +in replacement for the built-in :func:`map`. Patch by Josh Rosenberg. From 3cc0b4ef4ce86a7f4009e2e5ab8adb0b8dd69301 Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Sat, 23 Jul 2022 10:21:15 +0100 Subject: [PATCH 10/11] use popleft instead of fs[0] --- Lib/concurrent/futures/_base.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py index a95222a8e2427d..85760c92560d23 100644 --- a/Lib/concurrent/futures/_base.py +++ b/Lib/concurrent/futures/_base.py @@ -612,12 +612,9 @@ def result_iterator(): try: while fs: if timeout is None: - res = [fs[0].result()] + res = fs.popleft().result() else: - res = [fs[0].result(end_time - time.monotonic())] - - # Got a result, future needn't be cancelled - del fs[0] + res = fs.popleft().result(end_time - time.monotonic())] # Dispatch next task before yielding to keep # pipeline full @@ -629,7 +626,7 @@ def result_iterator(): else: fs.append(self.submit(fn, *args)) - yield res.pop() + yield res finally: for future in fs: future.cancel() From f058908a288551055803badbdf76a3da7820fafb Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Sat, 23 Jul 2022 11:03:56 +0100 Subject: [PATCH 11/11] Update Lib/concurrent/futures/_base.py --- Lib/concurrent/futures/_base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py index 85760c92560d23..3b8ce3ce60b59b 100644 --- a/Lib/concurrent/futures/_base.py +++ b/Lib/concurrent/futures/_base.py @@ -614,7 +614,7 @@ def result_iterator(): if timeout is None: res = fs.popleft().result() else: - res = fs.popleft().result(end_time - time.monotonic())] + res = fs.popleft().result(end_time - time.monotonic()) # Dispatch next task before yielding to keep # pipeline full