Skip to content

Commit 1c8fab8

Browse files
fix: better handle ConnectionResetError and log timing
1 parent 1a68135 commit 1c8fab8

File tree

1 file changed

+56
-30
lines changed

1 file changed

+56
-30
lines changed

tcp_modbus_aio/client.py

Lines changed: 56 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -411,7 +411,8 @@ async def send_modbus_message(
411411

412412
time_budget_remaining = timeout if timeout is not None else float("inf")
413413

414-
with catchtime() as t:
414+
# STEP ONE: WE ACQUIRE THE LOCK TO THE CONNECTION
415+
with catchtime() as lock_t:
415416
try:
416417
await asyncio.wait_for(
417418
self._comms_lock.acquire(), time_budget_remaining
@@ -420,40 +421,51 @@ async def send_modbus_message(
420421
raise ModbusCommunicationTimeoutError(
421422
f"Failed to acquire lock to send request {msg_str} to modbus device {self.host}"
422423
)
423-
time_budget_remaining -= t()
424+
time_budget_remaining -= lock_t()
424425

425426
try:
426427
if self.logger is not None:
427428
self.logger.debug(
428429
f"[{self}][send_modbus_message] acquired lock to send {msg_str}"
429430
)
430431

431-
with catchtime() as t:
432+
# STEP TWO: CREATE A CONNECTION IF ONE DOES NOT EXIST
433+
with catchtime() as conn_t:
432434
reader, writer = await self._get_tcp_connection(
433435
timeout=time_budget_remaining
434436
)
435-
time_budget_remaining -= t()
437+
time_budget_remaining -= conn_t()
436438

439+
# STEP THREE: WRITE OUR REQUEST
437440
try:
438441
writer.write(request_adu)
439442

440-
with catchtime() as t:
443+
with catchtime() as write_t:
441444
await asyncio.wait_for(writer.drain(), time_budget_remaining)
442-
time_budget_remaining -= t()
445+
time_budget_remaining -= write_t()
443446

444447
if self.logger is not None:
445448
self.logger.debug(f"[{self}][send_modbus_message] wrote {msg_str}")
446449

447-
except (asyncio.TimeoutError, OSError) as e:
450+
except (asyncio.TimeoutError, OSError, ConnectionResetError):
451+
# Clear connection no matter what if we fail on the write
452+
# TODO: consider revisiting this to only do it on OSError and ConnectionResetError
453+
# (but Gru is scared about partial writes)
454+
455+
if self.logger is not None:
456+
self.logger.warning(
457+
f"[{self}][send_modbus_message] Failed to send data to modbus device for "
458+
f"request {msg_str}, clearing connection"
459+
)
460+
461+
await self.clear_tcp_connection()
462+
448463
if retries > 0:
449464
if self.logger is not None:
450465
self.logger.warning(
451-
f"[{self}][send_modbus_message] Failed to send data to modbus device for "
452-
f"request {msg_str}, retrying {retries} more time(s)"
466+
f"[{self}][send_modbus_message] Retrying {retries} more time(s) after failure to write"
453467
)
454468

455-
await self.clear_tcp_connection()
456-
457469
# release the lock before retrying (so we can re-get it)
458470
self._comms_lock.release()
459471

@@ -463,13 +475,10 @@ async def send_modbus_message(
463475
retries=retries - 1,
464476
)
465477

466-
raise (
467-
ModbusCommunicationTimeoutError
468-
if isinstance(e, asyncio.TimeoutError)
469-
else ModbusCommunicationFailureError
470-
)(f"Failed to write request {msg_str} to modbus device {self.host}")
478+
raise
471479

472480
try:
481+
# STEP FOUR: READ THE MBAP HEADER FROM THE RESPONSE (AND ANY JUNK)
473482
expected_response_mbap_header = struct.pack(
474483
MBAP_HEADER_STRUCT_FORMAT,
475484
request_transaction_id,
@@ -478,12 +487,12 @@ async def send_modbus_message(
478487
self.slave_id,
479488
)
480489

481-
with catchtime() as t:
490+
with catchtime() as read_mbap_t:
482491
response_up_to_mbap_header = await asyncio.wait_for(
483492
reader.readuntil(expected_response_mbap_header),
484493
timeout=time_budget_remaining,
485494
)
486-
time_budget_remaining -= t()
495+
time_budget_remaining -= read_mbap_t()
487496

488497
if len(response_up_to_mbap_header) > MODBUS_MBAP_SIZE:
489498
# TODO: consider introspecting the discarded traffic here for better introspection
@@ -493,30 +502,47 @@ async def send_modbus_message(
493502
"before mbap header, likely catching up stream after timeouts"
494503
)
495504

496-
with catchtime() as t:
505+
# STEP FOUR: READ THE RESPONSE PDU
506+
with catchtime() as read_pdu_time:
497507
response_pdu = await asyncio.wait_for(
498508
reader.readexactly(request_function.expected_response_pdu_size),
499509
timeout=time_budget_remaining,
500510
)
501-
time_budget_remaining -= t()
511+
time_budget_remaining -= read_pdu_time()
502512

503513
except asyncio.TimeoutError:
504-
if error_on_no_response:
505-
raise ModbusCommunicationTimeoutError(
506-
f"Failed to read response to {msg_str} from modbus device {self.host}"
507-
)
514+
# Sometimes it is ok to not hear back
515+
if not error_on_no_response:
516+
return None
508517

509-
else:
510-
if self.logger is not None:
511-
self.logger.warning(
512-
f"[{self}][send_modbus_message] failed to read response to {msg_str}"
513-
)
518+
raise
519+
520+
except (asyncio.TimeoutError, OSError, ConnectionResetError) as e:
521+
# We clear the connection if the connection was reset by peer or was an OS error
522+
if isinstance(e, (OSError, ConnectionResetError)):
523+
print("CLEARING TCP ON GENERAL FAIL")
524+
await self.clear_tcp_connection()
525+
526+
raise (
527+
ModbusCommunicationTimeoutError
528+
if isinstance(e, asyncio.TimeoutError)
529+
else ModbusCommunicationFailureError
530+
)(
531+
f"Request {msg_str} failed to {self.host}:{self.port} ({type(e).__name__}({e}))"
532+
) from e
514533

515-
return None
516534
finally:
517535
if self._comms_lock.locked():
518536
self._comms_lock.release()
519537

538+
if self.logger is not None:
539+
self.logger.debug(
540+
f"[{self}][send_modbus_message] executed request/response with timing "
541+
f"lock={1000*lock_t():0.2f}ms conn={1000*conn_t():0.2f}ms "
542+
f"write={1000*write_t():0.2f}ms read_mbap={1000*read_mbap_t():0.2f}ms "
543+
f"read_pdu={1000*read_pdu_time():0.2f}ms"
544+
)
545+
520546
response_function = create_function_from_response_pdu(
521547
response_pdu, request_function
522548
)

0 commit comments

Comments
 (0)