-
-
Notifications
You must be signed in to change notification settings - Fork 32.3k
bpo-29842: Make Executor.map less eager so it handles large/unbounded… #18566
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
fbdb56c
6281379
8394e34
634a3de
0152831
3c38fab
d32de88
f3a3791
eb8d26a
9234a48
74ec669
b618b7f
13df4cd
bf36a8d
5cd250f
3cc0b4e
d33dca0
f058908
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,6 +4,7 @@ | |
__author__ = 'Brian Quinlan ([email protected])' | ||
|
||
import collections | ||
import itertools | ||
import logging | ||
import threading | ||
import time | ||
|
@@ -568,7 +569,7 @@ def submit(self, fn, /, *args, **kwargs): | |
""" | ||
raise NotImplementedError() | ||
|
||
def map(self, fn, *iterables, timeout=None, chunksize=1): | ||
def map(self, fn, *iterables, timeout=None, chunksize=1, prefetch=None): | ||
"""Returns an iterator equivalent to map(fn, iter). | ||
|
||
Args: | ||
|
@@ -580,6 +581,8 @@ def map(self, fn, *iterables, timeout=None, chunksize=1): | |
before being passed to a child process. This argument is only | ||
used by ProcessPoolExecutor; it is ignored by | ||
ThreadPoolExecutor. | ||
prefetch: The number of chunks to queue beyond the number of | ||
workers on the executor. If None, a reasonable default is used. | ||
|
||
Returns: | ||
An iterator equivalent to: map(func, *iterables) but the calls may | ||
|
@@ -592,21 +595,38 @@ def map(self, fn, *iterables, timeout=None, chunksize=1): | |
""" | ||
if timeout is not None: | ||
end_time = timeout + time.monotonic() | ||
if prefetch is None: | ||
prefetch = self._max_workers | ||
if prefetch < 0: | ||
raise ValueError("prefetch count may not be negative") | ||
|
||
fs = [self.submit(fn, *args) for args in zip(*iterables)] | ||
argsiter = zip(*iterables) | ||
initialargs = itertools.islice(argsiter, self._max_workers + prefetch) | ||
|
||
fs = collections.deque(self.submit(fn, *args) for args in initialargs) | ||
|
||
# Yield must be hidden in closure so that the futures are submitted | ||
# before the first iterator value is required. | ||
def result_iterator(): | ||
nonlocal argsiter | ||
try: | ||
# reverse to keep finishing order | ||
fs.reverse() | ||
while fs: | ||
# Careful not to keep a reference to the popped future | ||
if timeout is None: | ||
yield fs.pop().result() | ||
res = fs.popleft().result() | ||
else: | ||
yield fs.pop().result(end_time - time.monotonic()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this doesn't cancel the currently waited on fut correctly: #95166 import contextlib
import functools
import concurrent.futures
import threading
import sys
def fn(num, stop_event):
if num == 1:
stop_event.wait()
return "done 1"
if num == 2:
return "done 2"
def main():
stop_event = threading.Event()
log = []
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
def print_n_wait(ident):
log.append(f"{ident=} started")
try:
stop_event.wait()
finally:
log.append(f"{ident=} stopped")
fut = pool.submit(print_n_wait, ident="first")
try:
with contextlib.closing(pool.map(print_n_wait, ["second", "third"], timeout=1)) as gen:
try:
next(gen)
except concurrent.futures.TimeoutError:
print("timed out")
else:
raise RuntimeError("timeout expected")
finally:
stop_event.set()
assert log == ["ident='first' started", "ident='first' stopped"], f"{log=} is wrong"
if __name__ == "__main__":
sys.exit(main()) |
||
res = fs.popleft().result(end_time - time.monotonic()) | ||
|
||
# Dispatch next task before yielding to keep | ||
# pipeline full | ||
if argsiter: | ||
try: | ||
args = next(argsiter) | ||
except StopIteration: | ||
argsiter = None | ||
else: | ||
fs.append(self.submit(fn, *args)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the executor has been shut down, this will raise: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @rdarder |
||
|
||
yield res | ||
finally: | ||
for future in fs: | ||
future.cancel() | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
:meth:`concurrent.futures.Executor.map` no longer eagerly creates all futures prior to yielding any | ||
results. This allows it to work with huge or infinite :term:`iterable` without | ||
consuming excessive resources or crashing, making it more suitable as a drop | ||
in replacement for the built-in :func:`map`. Patch by Josh Rosenberg. |
Uh oh!
There was an error while loading. Please reload this page.