From a330ca32925fdbc0d0c9823d8ef928b81218b06c Mon Sep 17 00:00:00 2001 From: fjetter Date: Thu, 15 Feb 2024 11:27:49 +0100 Subject: [PATCH 1/6] Remove msgpack max_*_len options --- distributed/protocol/tests/test_protocol.py | 26 +++++++++++++++++++++ distributed/protocol/utils.py | 4 +--- 2 files changed, 27 insertions(+), 3 deletions(-) diff --git a/distributed/protocol/tests/test_protocol.py b/distributed/protocol/tests/test_protocol.py index 630239c1adc..6d3e006677d 100644 --- a/distributed/protocol/tests/test_protocol.py +++ b/distributed/protocol/tests/test_protocol.py @@ -208,3 +208,29 @@ def test_fallback_to_pickle(): assert L[0].count(b"__Pickled__") == 1 assert L[0].count(b"__Serialized__") == 1 assert loads(L) == {np.int64(1): {2: "a"}, 3: ("b", "c"), 4: "d"} + + +@pytest.mark.slow +@pytest.mark.parametrize("typ", [bytes, str, "ext"]) +def test_large_payload(typ): + critical_size = 2 * 1024**3 + if typ == bytes: + large_payload = critical_size * b"0" + expected = large_payload + elif typ == str: + large_payload = critical_size * "0" + expected = large_payload + # Testing array and map dtypes is practically not possible since we'd have + # to create an actual list or dict object of critical size (i.e. not the + # content but the container itself). These are so large that msgpack is + # running forever + # elif typ == "array": + # large_payload = [b"0"] * critical_size + # expected = tuple(large_payload) + # elif typ == "map": + # large_payload = {x: b"0" for x in range(critical_size)} + # expected = large_payload + elif typ == "ext": + large_payload = msgpack.ExtType(1, b"0" * critical_size) + expected = large_payload + assert loads(dumps(large_payload)) == expected diff --git a/distributed/protocol/utils.py b/distributed/protocol/utils.py index f7fa7a9984a..0777b8f9e0d 100644 --- a/distributed/protocol/utils.py +++ b/distributed/protocol/utils.py @@ -12,9 +12,7 @@ BIG_BYTES_SHARD_SIZE = dask.utils.parse_bytes(dask.config.get("distributed.comm.shard")) -msgpack_opts = { - ("max_%s_len" % x): 2**31 - 1 for x in ["str", "bin", "array", "map", "ext"] -} +msgpack_opts = {} msgpack_opts["strict_map_key"] = False msgpack_opts["raw"] = False From 9721a1e83fb6ebb2c486d7226b290d6f7d6e7ec8 Mon Sep 17 00:00:00 2001 From: fjetter Date: Thu, 15 Feb 2024 11:32:15 +0100 Subject: [PATCH 2/6] Revert "Tidying of OpenSSL 1.0.2/Python 3.9 (and earlier) handling (#5854)" This reverts commit 925c6100631a7436eb1c14bbd8cdc406f3b5d0a7. --- distributed/comm/tcp.py | 25 +++++++++++++------------ distributed/tests/test_core.py | 14 ++++++++++++++ 2 files changed, 27 insertions(+), 12 deletions(-) diff --git a/distributed/comm/tcp.py b/distributed/comm/tcp.py index 774d606e2a1..379b1e96b7a 100644 --- a/distributed/comm/tcp.py +++ b/distributed/comm/tcp.py @@ -44,14 +44,14 @@ logger = logging.getLogger(__name__) -# Workaround for OpenSSL 1.0.2. -# Can drop with OpenSSL 1.1.1 used by Python 3.10+. -# ref: https://bugs.python.org/issue42853 -if sys.version_info < (3, 10): - OPENSSL_MAX_CHUNKSIZE = 256 ** ctypes.sizeof(ctypes.c_int) // 2 - 1 -else: - OPENSSL_MAX_CHUNKSIZE = 256 ** ctypes.sizeof(ctypes.c_size_t) - 1 - +# We must not load more than this into a buffer at a time +# It's currently unclear why that is +# see +# - https://github.com/dask/distributed/pull/5854 +# - https://bugs.python.org/issue42853 +# - XXX + +C_INT_MAX = 256 ** ctypes.sizeof(ctypes.c_int) // 2 - 1 MAX_BUFFER_SIZE = MEMORY_LIMIT / 2 @@ -286,8 +286,8 @@ async def write(self, msg, serializers=None, on_error="message"): 2, range( 0, - each_frame_nbytes + OPENSSL_MAX_CHUNKSIZE, - OPENSSL_MAX_CHUNKSIZE, + each_frame_nbytes + C_INT_MAX, + C_INT_MAX, ), ): chunk = each_frame[i:j] @@ -360,7 +360,7 @@ async def read_bytes_rw(stream: IOStream, n: int) -> memoryview: for i, j in sliding_window( 2, - range(0, n + OPENSSL_MAX_CHUNKSIZE, OPENSSL_MAX_CHUNKSIZE), + range(0, n + C_INT_MAX, C_INT_MAX), ): chunk = buf[i:j] actual = await stream.read_into(chunk) # type: ignore[arg-type] @@ -432,7 +432,8 @@ class TLS(TCP): A TLS-specific version of TCP. """ - max_shard_size = min(OPENSSL_MAX_CHUNKSIZE, TCP.max_shard_size) + # Workaround for OpenSSL 1.0.2 (can drop with OpenSSL 1.1.1) + max_shard_size = min(C_INT_MAX, TCP.max_shard_size) def _read_extra(self): TCP._read_extra(self) diff --git a/distributed/tests/test_core.py b/distributed/tests/test_core.py index 85a502bafee..ba3d13bce15 100644 --- a/distributed/tests/test_core.py +++ b/distributed/tests/test_core.py @@ -1481,3 +1481,17 @@ def sync_handler(val): assert ledger == list(range(n)) finally: await comm.close() + + +@pytest.mark.slow +@gen_test() +async def test_large_payload(): + async with Server({"echo": echo_serialize}) as server: + await server.listen(0) + + comm = await connect(server.address) + data = b"0" * 3 * 1024**3 # 3GB + await comm.write({"op": "echo", "x": to_serialize(data)}) + response = await comm.read() + assert response["result"] == data + await comm.close() From a91b8e60532758fd1bb076610bd26aefadc2c715 Mon Sep 17 00:00:00 2001 From: fjetter Date: Thu, 15 Feb 2024 11:37:55 +0100 Subject: [PATCH 3/6] Add link to PR --- distributed/comm/tcp.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/comm/tcp.py b/distributed/comm/tcp.py index 379b1e96b7a..c741aba83e9 100644 --- a/distributed/comm/tcp.py +++ b/distributed/comm/tcp.py @@ -49,7 +49,7 @@ # see # - https://github.com/dask/distributed/pull/5854 # - https://bugs.python.org/issue42853 -# - XXX +# - https://github.com/dask/distributed/pull/8507 C_INT_MAX = 256 ** ctypes.sizeof(ctypes.c_int) // 2 - 1 MAX_BUFFER_SIZE = MEMORY_LIMIT / 2 From f8f1c125240e41ebc982e85afffdc3d5c9b46cc1 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Thu, 15 Feb 2024 15:01:24 +0000 Subject: [PATCH 4/6] Tweak test --- distributed/protocol/tests/test_protocol.py | 3 ++- distributed/tests/test_core.py | 18 +++++++++++++----- 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/distributed/protocol/tests/test_protocol.py b/distributed/protocol/tests/test_protocol.py index 6d3e006677d..c9db26fb79c 100644 --- a/distributed/protocol/tests/test_protocol.py +++ b/distributed/protocol/tests/test_protocol.py @@ -213,7 +213,8 @@ def test_fallback_to_pickle(): @pytest.mark.slow @pytest.mark.parametrize("typ", [bytes, str, "ext"]) def test_large_payload(typ): - critical_size = 2 * 1024**3 + """See also: test_core.py::test_large_payload""" + critical_size = 2**31 + 1 # >2 GiB if typ == bytes: large_payload = critical_size * b"0" expected = large_payload diff --git a/distributed/tests/test_core.py b/distributed/tests/test_core.py index ba3d13bce15..e75b47b7909 100644 --- a/distributed/tests/test_core.py +++ b/distributed/tests/test_core.py @@ -2,6 +2,7 @@ import asyncio import contextlib +import logging import os import random import socket @@ -1485,13 +1486,20 @@ def sync_handler(val): @pytest.mark.slow @gen_test() -async def test_large_payload(): +async def test_large_payload(caplog): + """See also: protocol/tests/test_protocol.py::test_large_payload""" + critical_size = 2**31 + 1 # >2 GiB + data = b"0" * critical_size + async with Server({"echo": echo_serialize}) as server: await server.listen(0) - comm = await connect(server.address) - data = b"0" * 3 * 1024**3 # 3GB - await comm.write({"op": "echo", "x": to_serialize(data)}) - response = await comm.read() + + # At debug level, messages are dumped into the log. By default, pytest captures + # all logs, which would make this test extremely expensive to run. + with caplog.at_level(logging.INFO, logger="distributed.core"): + await comm.write({"op": "echo", "x": to_serialize(data)}) + response = await comm.read() + assert response["result"] == data await comm.close() From 68b9c51abbfc5b8b273f80d7453bf070c1ec238e Mon Sep 17 00:00:00 2001 From: crusaderky Date: Thu, 15 Feb 2024 15:03:54 +0000 Subject: [PATCH 5/6] xref --- distributed/tests/test_core.py | 1 + 1 file changed, 1 insertion(+) diff --git a/distributed/tests/test_core.py b/distributed/tests/test_core.py index e75b47b7909..98fddc16830 100644 --- a/distributed/tests/test_core.py +++ b/distributed/tests/test_core.py @@ -1495,6 +1495,7 @@ async def test_large_payload(caplog): await server.listen(0) comm = await connect(server.address) + # FIXME https://github.com/dask/distributed/issues/8465 # At debug level, messages are dumped into the log. By default, pytest captures # all logs, which would make this test extremely expensive to run. with caplog.at_level(logging.INFO, logger="distributed.core"): From a7b43d9a3686f2fae639adfb414ca7da7fb53ec1 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Thu, 15 Feb 2024 17:19:00 +0000 Subject: [PATCH 6/6] trigger issue --- distributed/tests/test_core.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/distributed/tests/test_core.py b/distributed/tests/test_core.py index 98fddc16830..93bb18f16d3 100644 --- a/distributed/tests/test_core.py +++ b/distributed/tests/test_core.py @@ -1485,7 +1485,7 @@ def sync_handler(val): @pytest.mark.slow -@gen_test() +@gen_test(timeout=180) async def test_large_payload(caplog): """See also: protocol/tests/test_protocol.py::test_large_payload""" critical_size = 2**31 + 1 # >2 GiB @@ -1499,7 +1499,9 @@ async def test_large_payload(caplog): # At debug level, messages are dumped into the log. By default, pytest captures # all logs, which would make this test extremely expensive to run. with caplog.at_level(logging.INFO, logger="distributed.core"): - await comm.write({"op": "echo", "x": to_serialize(data)}) + # Note: if we wrap data in to_serialize, it will be sent as a buffer, which + # is not encoded by msgpack. + await comm.write({"op": "echo", "x": data}) response = await comm.read() assert response["result"] == data