Skip to content

Commit 81d0d7e

Browse files
committed
maybe
1 parent 3e6ebed commit 81d0d7e

File tree

1 file changed

+87
-27
lines changed

1 file changed

+87
-27
lines changed

jetstreamer-firehose/src/firehose.rs

Lines changed: 87 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -679,44 +679,55 @@ where
679679
epoch_num,
680680
block.slot
681681
);
682-
let mut slot = block.slot;
682+
let slot = block.slot;
683683
if slot >= slot_range.end {
684684
log::info!(target: &log_target, "reached end of slot range at slot {}", slot);
685685
// Return early to terminate the firehose thread cleanly. We use >=
686686
// because slot_range is half-open [start, end), so any slot equal
687687
// to end is out-of-range and must not be processed.
688688

689-
// still need to emit skipped slots up to end-1
690-
slot = slot_range.end;
691-
if let (Some(on_block_cb), Some(previous_slot)) =
692-
(on_block.as_ref(), previous_slot)
693-
{
694-
for skipped_slot in (previous_slot + 2)..slot {
695-
log::debug!(
696-
target: &log_target,
697-
"leader skipped slot {} (previous slot {}, current slot {})",
698-
skipped_slot,
699-
previous_slot,
700-
slot,
701-
);
702-
if block_enabled {
703-
on_block_cb(
704-
thread_index,
705-
BlockData::LeaderSkipped { slot: skipped_slot },
706-
)
707-
.await
708-
.map_err(|e| FirehoseError::BlockHandlerError(e))
709-
.map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?;
689+
let target_last_slot = slot_range.end.saturating_sub(1);
690+
let skip_start_from_previous = previous_slot
691+
.map(|s| s.saturating_add(1))
692+
.unwrap_or(slot_range.start);
693+
let first_untracked_slot = last_counted_slot.saturating_add(1);
694+
let skip_start = std::cmp::max(skip_start_from_previous, first_untracked_slot);
695+
696+
if skip_start <= target_last_slot {
697+
if block_enabled {
698+
if let Some(on_block_cb) = on_block.as_ref() {
699+
for skipped_slot in skip_start..=target_last_slot {
700+
log::debug!(
701+
target: &log_target,
702+
"leader skipped slot {} (prev_counted {}, target {})",
703+
skipped_slot,
704+
last_counted_slot,
705+
target_last_slot,
706+
);
707+
on_block_cb(
708+
thread_index,
709+
BlockData::LeaderSkipped { slot: skipped_slot },
710+
)
711+
.await
712+
.map_err(|e| FirehoseError::BlockHandlerError(e))
713+
.map_err(|e| (e, current_slot.unwrap_or(slot_range.start)))?;
714+
}
710715
}
716+
}
717+
718+
let missing_slots = target_last_slot.saturating_sub(skip_start) + 1;
719+
if tracking_enabled {
711720
if let Some(ref mut stats) = thread_stats {
712-
stats.leader_skipped_slots += 1;
713-
stats.slots_processed += 1;
714-
stats.current_slot = skipped_slot;
721+
stats.leader_skipped_slots += missing_slots;
722+
stats.slots_processed += missing_slots;
723+
stats.current_slot = target_last_slot;
715724
}
716-
slots_since_stats.fetch_add(1, Ordering::Relaxed);
717-
fetch_add_if(tracking_enabled, &overall_slots_processed, 1);
725+
overall_slots_processed.fetch_add(missing_slots, Ordering::Relaxed);
726+
slots_since_stats.fetch_add(missing_slots, Ordering::Relaxed);
718727
}
728+
last_counted_slot = target_last_slot;
719729
}
730+
720731
return Ok(());
721732
}
722733
debug_assert!(slot < slot_range.end, "processing out-of-range slot {} (end {})", slot, slot_range.end);
@@ -991,6 +1002,19 @@ where
9911002
newly_skipped_slots = newly_skipped_slots.saturating_add(1);
9921003
}
9931004

1005+
let block_is_new = slot > prev_last_counted_slot;
1006+
1007+
if !block_is_new {
1008+
log::debug!(
1009+
target: &log_target,
1010+
"duplicate block {}, already counted (last_counted={})",
1011+
slot,
1012+
prev_last_counted_slot,
1013+
);
1014+
this_block_rewards.clear();
1015+
continue;
1016+
}
1017+
9941018
if block_enabled {
9951019
if let Some(on_block_cb) = on_block.as_ref() {
9961020
let keyed_rewards = std::mem::take(&mut this_block_rewards);
@@ -1273,6 +1297,42 @@ where
12731297
return Ok(());
12741298
}
12751299
}
1300+
if let Some(expected_last_slot) = slot_range.end.checked_sub(1) {
1301+
if last_counted_slot < expected_last_slot {
1302+
let flush_start = last_counted_slot.saturating_add(1);
1303+
if block_enabled {
1304+
if let Some(on_block_cb) = on_block.as_ref() {
1305+
let error_slot = current_slot.unwrap_or(slot_range.start);
1306+
for skipped_slot in flush_start..=expected_last_slot {
1307+
log::debug!(
1308+
target: &log_target,
1309+
"leader skipped slot {} during final flush (prev_counted {})",
1310+
skipped_slot,
1311+
last_counted_slot,
1312+
);
1313+
on_block_cb(
1314+
thread_index,
1315+
BlockData::LeaderSkipped { slot: skipped_slot },
1316+
)
1317+
.await
1318+
.map_err(|e| FirehoseError::BlockHandlerError(e))
1319+
.map_err(|e| (e, error_slot))?;
1320+
}
1321+
}
1322+
}
1323+
let missing_slots = expected_last_slot.saturating_sub(last_counted_slot);
1324+
if tracking_enabled {
1325+
if let Some(stats_ref) = thread_stats.as_mut() {
1326+
stats_ref.leader_skipped_slots += missing_slots;
1327+
stats_ref.slots_processed += missing_slots;
1328+
stats_ref.current_slot = expected_last_slot;
1329+
}
1330+
overall_slots_processed.fetch_add(missing_slots, Ordering::Relaxed);
1331+
slots_since_stats.fetch_add(missing_slots, Ordering::Relaxed);
1332+
}
1333+
last_counted_slot = expected_last_slot;
1334+
}
1335+
}
12761336
if let Some(ref mut stats) = thread_stats {
12771337
stats.finish_time = Some(std::time::Instant::now());
12781338
maybe_emit_stats(

0 commit comments

Comments
 (0)