Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions Lib/asyncio/base_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,8 @@ def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog,
ssl_handshake_timeout, ssl_shutdown_timeout=None):
self._loop = loop
self._sockets = sockets
# Weak references so abandoned transports can be detected
# Weak references so we don't break Transport's ability to
# detect abandoned transports
self._clients = weakref.WeakSet()
self._waiters = []
self._protocol_factory = protocol_factory
Expand All @@ -296,8 +297,6 @@ def _attach(self, transport):
self._clients.add(transport)

def _detach(self, transport):
# Note that 'transport' may already be missing from
# self._clients if it has been garbage collected
self._clients.discard(transport)
if len(self._clients) == 0 and self._sockets is None:
self._wakeup()
Expand Down
36 changes: 22 additions & 14 deletions Lib/test/test_asyncio/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,8 @@ async def serve(rd, wr):

srv.close()
srv.close_clients()
await asyncio.sleep(0.1) # FIXME: flush call_soon()?
await asyncio.sleep(0)
await asyncio.sleep(0)
self.assertTrue(task.done())

async def test_abort_clients(self):
Expand All @@ -223,28 +224,35 @@ async def serve(rd, wr):
self.addCleanup(srv.close)

addr = srv.sockets[0].getsockname()
(c_rd, c_wr) = await asyncio.open_connection(addr[0], addr[1])
(c_rd, c_wr) = await asyncio.open_connection(addr[0], addr[1], limit=4096)
self.addCleanup(c_wr.close)

# Make sure both sides are in a paused state
while (s_wr.transport.get_write_buffer_size() == 0 or
c_wr.transport.is_reading()):
while s_wr.transport.get_write_buffer_size() == 0:
s_wr.write(b'a' * 65536)
await asyncio.sleep(0)
await asyncio.sleep(0.1) # FIXME: More socket buffer space magically appears?
# Limit the send buffer so we can reliably overfill it
s_sock = s_wr.get_extra_info('socket')
s_sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 65536)

# Get the reader in to a paused state by sending more than twice
# the configured limit
s_wr.write(b'a' * 4096)
s_wr.write(b'a' * 4096)
s_wr.write(b'a' * 4096)
while c_wr.transport.is_reading():
await asyncio.sleep(0)

# Get the writer in a waiting state by sending data until the
# kernel stops accepting more in to the send buffer
while s_wr.transport.get_write_buffer_size() == 0:
s_wr.write(b'a' * 4096)
await asyncio.sleep(0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we put an upper bound on these while loops and fail the test when they loop too many times? (Possibly a for loop with a break is the best idiom to do this.)

I worry that something in the network stack might cause one or the other to loop forever, and I'd rather not waste the CPU time in CI over this.

How many iterations do you expect? Is it deterministic regardless of platform?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit sensitive to platform behaviour, but I believe I can get something that dynamically adapts


task = asyncio.create_task(srv.wait_closed())
await asyncio.sleep(0)
self.assertFalse(task.done())

# Sanity check
self.assertNotEqual(s_wr.transport.get_write_buffer_size(), 0)
self.assertFalse(c_wr.transport.is_reading())

srv.close()
srv.abort_clients()
await asyncio.sleep(0.1) # FIXME: flush call_soon()?
await asyncio.sleep(0)
await asyncio.sleep(0)
self.assertTrue(task.done())


Expand Down