Skip to content

Commit 1efdba6

Browse files
authored
Merge pull request #7 from rpcpool/move_snapshot_to_indexer_branch
Move snapshot logic into the indexer branch
2 parents 755de83 + f43f40c commit 1efdba6

File tree

1 file changed

+60
-58
lines changed

1 file changed

+60
-58
lines changed

src/main.rs

Lines changed: 60 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -202,71 +202,73 @@ async fn main() {
202202
let is_rpc_node_local = args.rpc_url.contains("127.0.0.1");
203203
let rpc_client = get_rpc_client(&args.rpc_url);
204204

205-
let last_indexed_slot = match args.start_slot {
206-
Some(start_slot) => match start_slot.as_str() {
207-
"latest" => fetch_current_slot_with_infinite_retry(&rpc_client).await,
208-
_ => {
209-
fetch_block_parent_slot(&rpc_client, start_slot.parse::<u64>().unwrap())
210-
.await
211-
}
212-
},
213-
None => fetch_last_indexed_slot_with_infinite_retry(db_conn.as_ref())
214-
.await
215-
.unwrap_or(
216-
get_network_start_slot(&rpc_client)
217-
.await
218-
.try_into()
219-
.unwrap(),
220-
)
221-
.try_into()
222-
.unwrap(),
223-
};
224-
if let Some(snapshot_dir) = args.snapshot_dir {
225-
let directory_adapter = Arc::new(DirectoryAdapter::from_local_directory(snapshot_dir));
226-
let snapshot_files = get_snapshot_files_with_metadata(&directory_adapter)
227-
.await
228-
.unwrap();
229-
if !snapshot_files.is_empty() {
230-
info!("Detected snapshot files. Loading snapshot...");
231-
let last_slot = snapshot_files.last().unwrap().end_slot;
232-
// Compute the snapshot offset, if the snapshot is not more recent than this offset then don't fetch the snapshot
233-
let snapshot_offset = last_slot - args.snapshot_offset.unwrap_or(0);
234-
235-
if snapshot_offset >= last_indexed_slot {
236-
info!("Snapshot is newer than the last indexed slot. Loading snapshot...");
237-
238-
let block_stream =
239-
load_block_stream_from_directory_adapter(directory_adapter.clone()).await;
240-
pin_mut!(block_stream);
241-
let first_blocks = block_stream.next().await.unwrap();
242-
let last_stream_indexed_slot = first_blocks.first().unwrap().metadata.parent_slot;
243-
let block_stream = stream! {
244-
yield first_blocks;
245-
while let Some(blocks) = block_stream.next().await {
246-
yield blocks;
247-
}
248-
};
249-
index_block_stream(
250-
block_stream,
251-
db_conn.clone(),
252-
rpc_client.clone(),
253-
last_stream_indexed_slot,
254-
Some(last_slot),
255-
)
256-
.await;
257-
} else {
258-
info!("Snapshot is already indexed. Skipping...");
259-
}
260-
}
261-
}
262-
263205
let (indexer_handle, monitor_handle) = match args.disable_indexing {
264206
true => {
265207
info!("Indexing is disabled");
266208
(None, None)
267209
}
268210
false => {
269211
info!("Starting indexer...");
212+
213+
let last_indexed_slot = match args.start_slot {
214+
Some(start_slot) => match start_slot.as_str() {
215+
"latest" => fetch_current_slot_with_infinite_retry(&rpc_client).await,
216+
_ => {
217+
fetch_block_parent_slot(&rpc_client, start_slot.parse::<u64>().unwrap())
218+
.await
219+
}
220+
},
221+
None => fetch_last_indexed_slot_with_infinite_retry(db_conn.as_ref())
222+
.await
223+
.unwrap_or(
224+
get_network_start_slot(&rpc_client)
225+
.await
226+
.try_into()
227+
.unwrap(),
228+
)
229+
.try_into()
230+
.unwrap(),
231+
};
232+
if let Some(snapshot_dir) = args.snapshot_dir {
233+
let directory_adapter = Arc::new(DirectoryAdapter::from_local_directory(snapshot_dir));
234+
let snapshot_files = get_snapshot_files_with_metadata(&directory_adapter)
235+
.await
236+
.unwrap();
237+
if !snapshot_files.is_empty() {
238+
info!("Detected snapshot files. Loading snapshot...");
239+
let last_slot = snapshot_files.last().unwrap().end_slot;
240+
// Compute the snapshot offset, if the snapshot is not more recent than this offset then don't fetch the snapshot
241+
let snapshot_offset = last_slot - args.snapshot_offset.unwrap_or(0);
242+
243+
if snapshot_offset >= last_indexed_slot {
244+
info!("Snapshot is newer than the last indexed slot. Loading snapshot...");
245+
246+
let block_stream =
247+
load_block_stream_from_directory_adapter(directory_adapter.clone()).await;
248+
pin_mut!(block_stream);
249+
let first_blocks = block_stream.next().await.unwrap();
250+
let last_stream_indexed_slot = first_blocks.first().unwrap().metadata.parent_slot;
251+
let block_stream = stream! {
252+
yield first_blocks;
253+
while let Some(blocks) = block_stream.next().await {
254+
yield blocks;
255+
}
256+
};
257+
index_block_stream(
258+
block_stream,
259+
db_conn.clone(),
260+
rpc_client.clone(),
261+
last_stream_indexed_slot,
262+
Some(last_slot),
263+
)
264+
.await;
265+
} else {
266+
info!("Snapshot is already indexed. Skipping...");
267+
}
268+
}
269+
}
270+
271+
270272
// For localnet we can safely use a large batch size to speed up indexing.
271273
let max_concurrent_block_fetches = match args.max_concurrent_block_fetches {
272274
Some(max_concurrent_block_fetches) => max_concurrent_block_fetches,

0 commit comments

Comments
 (0)