|
12 | 12 | class DecodePDU:
|
13 | 13 | """Decode pdu requests/responses (server/client)."""
|
14 | 14 |
|
15 |
| - _pdu_sub_class_table: set[tuple[type[ModbusPDU], type[ModbusPDU]]] = set() |
16 | 15 | pdu_table: dict[int, tuple[type[ModbusPDU], type[ModbusPDU]]] = {}
|
17 | 16 | pdu_sub_table: dict[int, dict[int, tuple[type[ModbusPDU], type[ModbusPDU]]]] = {}
|
18 | 17 |
|
19 | 18 |
|
20 | 19 | def __init__(self, is_server: bool) -> None:
|
21 | 20 | """Initialize function_tables."""
|
22 | 21 | self.pdu_inx = 0 if is_server else 1
|
23 |
| - inx = 0 if is_server else 1 |
24 |
| - self.sub_lookup: dict[int, dict[int, type[ModbusPDU]]] = {} |
25 |
| - for f in self._pdu_sub_class_table: |
26 |
| - if (function_code := f[inx].function_code) not in self.sub_lookup: |
27 |
| - self.sub_lookup[function_code] = {f[inx].sub_function_code: f[inx]} |
28 |
| - else: |
29 |
| - self.sub_lookup[function_code][f[inx].sub_function_code] = f[inx] |
30 | 22 |
|
31 | 23 | def lookupPduClass(self, data: bytes) -> type[ModbusPDU] | None:
|
32 | 24 | """Use `function_code` to determine the class of the PDU."""
|
@@ -58,7 +50,6 @@ def add_sub_pdu(cls, req: type[ModbusPDU], resp: type[ModbusPDU]):
|
58 | 50 | if req.function_code not in cls.pdu_sub_table:
|
59 | 51 | cls.pdu_sub_table[req.function_code] = {}
|
60 | 52 | cls.pdu_sub_table[req.function_code][req.sub_function_code] = (req, resp)
|
61 |
| - cls._pdu_sub_class_table.add((req, resp)) |
62 | 53 |
|
63 | 54 | def register(self, custom_class: type[ModbusPDU]) -> None:
|
64 | 55 | """Register a function and sub function class with the decoder."""
|
|
0 commit comments