Skip to content

Commit 1edde1e

Browse files
Refactor EVM node synchronization (#958)
1 parent a8618fe commit 1edde1e

File tree

17 files changed

+305
-166
lines changed

17 files changed

+305
-166
lines changed

CHANGELOG.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,17 @@ All notable changes to this project will be documented in this file.
44

55
The format is based on [Keep a Changelog], and this project adheres to [Semantic Versioning].
66

7+
## [Unreleased]
8+
9+
### Fixed
10+
11+
- evm.node: Fixed default ratelimit sleep time being too high.
12+
- evm.subsquid.transactions: Fixed issue with `node_only` flag ignored.
13+
14+
### Performance
15+
16+
- evm.subsquid: Dynamically adjust the batch size when syncing with node.
17+
718
## [7.5.0] - 2024-03-08
819

920
### Added

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ black: ## Format with black
3131
black ${SOURCE}
3232

3333
ruff: ## Lint with ruff
34-
ruff check --fix ${SOURCE}
34+
ruff check --fix --unsafe-fixes ${SOURCE}
3535

3636
mypy: ## Lint with mypy
3737
mypy ${SOURCE}

src/dipdup/datasources/evm_node.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,8 @@ async def wait_level(self) -> None:
8888

8989
class EvmNodeDatasource(IndexDatasource[EvmNodeDatasourceConfig]):
9090
_default_http_config = HttpConfig(
91-
batch_size=32,
92-
ratelimit_sleep=30,
91+
batch_size=10,
92+
ratelimit_sleep=1,
9393
polling_interval=1.0,
9494
)
9595

src/dipdup/fetcher.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,8 @@ async def fetch(self) -> None:
136136

137137

138138
class DataFetcher(ABC, Generic[FetcherBufferT]):
139+
"""Fetches contract data from REST API, merges them and yields by level."""
140+
139141
def __init__(
140142
self,
141143
datasource: IndexDatasource[Any],
@@ -149,4 +151,9 @@ def __init__(
149151
self._head = 0
150152

151153
@abstractmethod
152-
def fetch_by_level(self) -> AsyncIterator[tuple[int, tuple[FetcherBufferT, ...]]]: ...
154+
def fetch_by_level(self) -> AsyncIterator[tuple[int, tuple[FetcherBufferT, ...]]]:
155+
"""Iterate over events data from REST.
156+
157+
Resulting data is splitted by level, deduped, sorted and ready to be processed by TzktEventsIndex.
158+
"""
159+
...

src/dipdup/indexes/evm_node.py

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
import asyncio
2+
import random
3+
from abc import ABC
4+
from collections import defaultdict
5+
from collections import deque
6+
from typing import Any
7+
from typing import Generic
8+
9+
from dipdup.datasources.evm_node import EvmNodeDatasource
10+
from dipdup.exceptions import FrameworkException
11+
from dipdup.fetcher import DataFetcher
12+
from dipdup.fetcher import FetcherBufferT
13+
14+
EVM_NODE_READAHEAD_LIMIT = 5000
15+
MIN_BATCH_SIZE = 10
16+
MAX_BATCH_SIZE = 10000
17+
BATCH_SIZE_UP = 1.1
18+
BATCH_SIZE_DOWN = 0.5
19+
20+
21+
class EvmNodeFetcher(Generic[FetcherBufferT], DataFetcher[FetcherBufferT], ABC):
22+
def __init__(
23+
self,
24+
datasources: tuple[EvmNodeDatasource, ...],
25+
first_level: int,
26+
last_level: int,
27+
) -> None:
28+
super().__init__(datasources[0], first_level, last_level)
29+
self._datasources = datasources
30+
31+
def get_next_batch_size(self, batch_size: int, ratelimited: bool) -> int:
32+
if ratelimited:
33+
batch_size = int(batch_size * BATCH_SIZE_DOWN)
34+
else:
35+
batch_size = int(batch_size * BATCH_SIZE_UP)
36+
37+
batch_size = min(MAX_BATCH_SIZE, batch_size)
38+
batch_size = max(MIN_BATCH_SIZE, batch_size)
39+
return int(batch_size)
40+
41+
def get_random_node(self) -> EvmNodeDatasource:
42+
if not self._datasources:
43+
raise FrameworkException('A node datasource requested, but none attached to this index')
44+
return random.choice(self._datasources)
45+
46+
async def get_blocks_batch(
47+
self,
48+
levels: set[int],
49+
full_transactions: bool = False,
50+
node: EvmNodeDatasource | None = None,
51+
) -> dict[int, dict[str, Any]]:
52+
tasks: deque[asyncio.Task[Any]] = deque()
53+
blocks: dict[int, Any] = {}
54+
node = node or self.get_random_node()
55+
56+
async def _fetch(level: int) -> None:
57+
blocks[level] = await node.get_block_by_level(
58+
block_number=level,
59+
full_transactions=full_transactions,
60+
)
61+
62+
for level in levels:
63+
tasks.append(
64+
asyncio.create_task(
65+
_fetch(level),
66+
name=f'get_block_range:{level}',
67+
),
68+
)
69+
70+
await asyncio.gather(*tasks)
71+
return blocks
72+
73+
async def get_logs_batch(
74+
self,
75+
first_level: int,
76+
last_level: int,
77+
node: EvmNodeDatasource | None = None,
78+
) -> dict[int, list[dict[str, Any]]]:
79+
grouped_logs: defaultdict[int, list[dict[str, Any]]] = defaultdict(list)
80+
node = node or self.get_random_node()
81+
logs = await node.get_logs(
82+
{
83+
'fromBlock': hex(first_level),
84+
'toBlock': hex(last_level),
85+
},
86+
)
87+
for log in logs:
88+
grouped_logs[int(log['blockNumber'], 16)].append(log)
89+
return grouped_logs

src/dipdup/indexes/evm_subsquid.py

Lines changed: 3 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,6 @@
1-
import asyncio
21
import random
32
from abc import ABC
43
from abc import abstractmethod
5-
from collections import defaultdict
6-
from collections import deque
74
from typing import Any
85
from typing import Generic
96
from typing import TypeVar
@@ -27,6 +24,8 @@
2724
from dipdup.package import DipDupPackage
2825
from dipdup.prometheus import Metrics
2926

27+
SUBSQUID_READAHEAD_LIMIT = 5000
28+
3029
IndexConfigT = TypeVar('IndexConfigT', bound=SubsquidIndexConfigU)
3130
DatasourceT = TypeVar('DatasourceT', bound=SubsquidDatasource)
3231

@@ -86,11 +85,6 @@ def node_datasources(self) -> tuple[EvmNodeDatasource, ...]:
8685
def datasources(self) -> tuple[IndexDatasource[Any], ...]:
8786
return (self.datasource, *self.node_datasources)
8887

89-
def get_random_node(self) -> EvmNodeDatasource:
90-
if not self._node_datasources:
91-
raise FrameworkException('A node datasource requested, but none attached to this index')
92-
return random.choice(self._node_datasources)
93-
9488
def get_sync_level(self) -> int:
9589
"""Get level index needs to be synchronized to depending on its subscription status"""
9690
sync_levels = set()
@@ -108,51 +102,6 @@ def get_sync_level(self) -> int:
108102
# NOTE: Choose the highest level; outdated realtime messages will be dropped from the queue anyway.
109103
return max(cast(set[int], sync_levels))
110104

111-
async def get_blocks_batch(
112-
self,
113-
levels: set[int],
114-
full_transactions: bool = False,
115-
node: EvmNodeDatasource | None = None,
116-
) -> dict[int, dict[str, Any]]:
117-
tasks: deque[asyncio.Task[Any]] = deque()
118-
blocks: dict[int, Any] = {}
119-
node = node or self.get_random_node()
120-
121-
async def _fetch(level: int) -> None:
122-
blocks[level] = await node.get_block_by_level(
123-
block_number=level,
124-
full_transactions=full_transactions,
125-
)
126-
127-
for level in levels:
128-
tasks.append(
129-
asyncio.create_task(
130-
_fetch(level),
131-
name=f'get_block_range:{level}',
132-
),
133-
)
134-
135-
await asyncio.gather(*tasks)
136-
return blocks
137-
138-
async def get_logs_batch(
139-
self,
140-
first_level: int,
141-
last_level: int,
142-
node: EvmNodeDatasource | None = None,
143-
) -> dict[int, list[dict[str, Any]]]:
144-
grouped_logs: defaultdict[int, list[dict[str, Any]]] = defaultdict(list)
145-
node = node or self.get_random_node()
146-
logs = await node.get_logs(
147-
{
148-
'fromBlock': hex(first_level),
149-
'toBlock': hex(last_level),
150-
},
151-
)
152-
for log in logs:
153-
grouped_logs[int(log['blockNumber'], 16)].append(log)
154-
return grouped_logs
155-
156105
async def _get_node_sync_level(
157106
self,
158107
subsquid_level: int,
@@ -161,7 +110,7 @@ async def _get_node_sync_level(
161110
) -> int | None:
162111
if not self.node_datasources:
163112
return None
164-
node = node or self.get_random_node()
113+
node = node or random.choice(self.node_datasources)
165114

166115
node_sync_level = await node.get_head_level()
167116
subsquid_lag = abs(node_sync_level - subsquid_level)
Lines changed: 73 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,20 @@
1+
import random
2+
import time
13
from collections.abc import AsyncIterator
24

5+
from dipdup.datasources.evm_node import EvmNodeDatasource
36
from dipdup.datasources.evm_subsquid import SubsquidDatasource
47
from dipdup.fetcher import DataFetcher
58
from dipdup.fetcher import readahead_by_level
9+
from dipdup.indexes.evm_node import EVM_NODE_READAHEAD_LIMIT
10+
from dipdup.indexes.evm_node import MIN_BATCH_SIZE
11+
from dipdup.indexes.evm_node import EvmNodeFetcher
12+
from dipdup.indexes.evm_subsquid import SUBSQUID_READAHEAD_LIMIT
13+
from dipdup.models.evm_node import EvmNodeLogData
614
from dipdup.models.evm_subsquid import SubsquidEventData
715

816

9-
class EventLogFetcher(DataFetcher[SubsquidEventData]):
10-
"""Fetches contract events from REST API, merges them and yields by level."""
11-
17+
class SubsquidEventFetcher(DataFetcher[SubsquidEventData]):
1218
_datasource: SubsquidDatasource
1319

1420
def __init__(
@@ -22,14 +28,73 @@ def __init__(
2228
self._topics = topics
2329

2430
async def fetch_by_level(self) -> AsyncIterator[tuple[int, tuple[SubsquidEventData, ...]]]:
25-
"""Iterate over events fetched fetched from REST.
26-
27-
Resulting data is splitted by level, deduped, sorted and ready to be processed by TzktEventsIndex.
28-
"""
2931
event_iter = self._datasource.iter_event_logs(
3032
self._topics,
3133
self._first_level,
3234
self._last_level,
3335
)
34-
async for level, batch in readahead_by_level(event_iter, limit=5_000):
36+
async for level, batch in readahead_by_level(event_iter, limit=SUBSQUID_READAHEAD_LIMIT):
37+
yield level, batch
38+
39+
40+
class EvmNodeEventFetcher(EvmNodeFetcher[EvmNodeLogData]):
41+
_datasource: EvmNodeDatasource
42+
43+
async def fetch_by_level(self) -> AsyncIterator[tuple[int, tuple[EvmNodeLogData, ...]]]:
44+
event_iter = self._fetch_by_level()
45+
async for level, batch in readahead_by_level(event_iter, limit=EVM_NODE_READAHEAD_LIMIT):
3546
yield level, batch
47+
48+
async def _fetch_by_level(self) -> AsyncIterator[tuple[EvmNodeLogData, ...]]:
49+
batch_size = MIN_BATCH_SIZE
50+
batch_first_level = self._first_level
51+
ratelimited: bool = False
52+
53+
while batch_first_level <= self._last_level:
54+
node = random.choice(self._datasources)
55+
batch_size = self.get_next_batch_size(batch_size, ratelimited)
56+
ratelimited = False
57+
58+
started = time.time()
59+
60+
batch_last_level = min(
61+
batch_first_level + batch_size,
62+
self._last_level,
63+
)
64+
log_batch = await self.get_logs_batch(
65+
batch_first_level,
66+
batch_last_level,
67+
node,
68+
)
69+
70+
finished = time.time()
71+
if finished - started >= node._http_config.ratelimit_sleep:
72+
ratelimited = True
73+
74+
timestamps: dict[int, int] = {}
75+
log_levels = list(log_batch.keys())
76+
77+
# NOTE: Split log_levels to chunks of batch_size
78+
log_level_batches = [set(log_levels[i : i + batch_size]) for i in range(0, len(log_levels), batch_size)]
79+
80+
for log_level_batch in log_level_batches:
81+
82+
started = time.time()
83+
84+
block_batch = await self.get_blocks_batch(log_level_batch)
85+
for level, block in block_batch.items():
86+
timestamps[level] = int(block['timestamp'], 16)
87+
88+
finished = time.time()
89+
if finished - started >= node._http_config.ratelimit_sleep:
90+
ratelimited = True
91+
92+
for level, level_logs in log_batch.items():
93+
if not level_logs:
94+
continue
95+
96+
parsed_level_logs = tuple(EvmNodeLogData.from_json(log, timestamps[level]) for log in level_logs)
97+
98+
yield parsed_level_logs
99+
100+
batch_first_level = batch_last_level + 1

0 commit comments

Comments
 (0)