Skip to content

Commit 7131cf9

Browse files
committed
possible fix
1 parent 304990b commit 7131cf9

File tree

1 file changed

+116
-71
lines changed

1 file changed

+116
-71
lines changed

jetstreamer-firehose/src/firehose.rs

Lines changed: 116 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -545,6 +545,7 @@ where
545545
let entry_enabled = on_entry.is_some();
546546
let reward_enabled = on_reward.is_some();
547547
let tracking_enabled = stats_tracking.is_some();
548+
let mut last_counted_slot = slot_range.start.saturating_sub(1);
548549

549550
// let mut triggered = false;
550551
while let Err((err, slot)) = async {
@@ -942,6 +943,7 @@ where
942943
}
943944
}
944945
Block(block) => {
946+
let prev_last_counted_slot = last_counted_slot;
945947
let thread_stats_snapshot = thread_stats.as_ref().map(|stats| {
946948
(
947949
stats.slots_processed,
@@ -975,11 +977,6 @@ where
975977
error_slot,
976978
)
977979
})?;
978-
if let Some(ref mut stats) = thread_stats {
979-
stats.leader_skipped_slots += 1;
980-
stats.slots_processed += 1;
981-
stats.current_slot = skipped_slot;
982-
}
983980
counted_leader_skips =
984981
counted_leader_skips.saturating_add(1);
985982
}
@@ -1015,78 +1012,126 @@ where
10151012
this_block_rewards.clear();
10161013
}
10171014
previous_blockhash = latest_entry_blockhash;
1018-
if let (Some(stats_tracking_tmp), Some(thread_stats)) =
1019-
(&stats_tracking, &mut thread_stats)
1020-
{
1021-
let slot_increment =
1022-
counted_leader_skips.saturating_add(1);
1023-
overall_slots_processed
1024-
.fetch_add(slot_increment, Ordering::Relaxed);
1025-
overall_blocks_processed.fetch_add(1, Ordering::Relaxed);
1026-
thread_stats.blocks_processed += 1;
1027-
thread_stats.slots_processed += 1;
1028-
thread_stats.current_slot = slot;
1029-
blocks_since_stats.fetch_add(1, Ordering::Relaxed);
1030-
slots_since_stats
1031-
.fetch_add(slot_increment, Ordering::Relaxed);
1032-
if slot % stats_tracking_tmp.tracking_interval_slots == 0 {
1033-
if let Err(err) = maybe_emit_stats(
1034-
stats_tracking.as_ref(),
1035-
thread_index,
1036-
thread_stats,
1037-
&overall_slots_processed,
1038-
&overall_blocks_processed,
1039-
&overall_transactions_processed,
1040-
&overall_entries_processed,
1041-
&transactions_since_stats,
1042-
&blocks_since_stats,
1043-
&slots_since_stats,
1044-
&last_pulse,
1045-
start_time,
1046-
)
1047-
.await
1015+
1016+
let slot_increment = slot.saturating_sub(prev_last_counted_slot);
1017+
let block_increment = if slot_increment > 0 { 1 } else { 0 };
1018+
let leader_skipped_increment = slot_increment
1019+
.saturating_sub(block_increment)
1020+
.min(counted_leader_skips);
1021+
1022+
if tracking_enabled {
1023+
if let Some(thread_stats_ref) = thread_stats.as_mut() {
1024+
if slot_increment > 0 {
1025+
thread_stats_ref.slots_processed += slot_increment;
1026+
thread_stats_ref.current_slot = slot;
1027+
}
1028+
if block_increment > 0 {
1029+
thread_stats_ref.blocks_processed += block_increment;
1030+
}
1031+
if leader_skipped_increment > 0 {
1032+
thread_stats_ref.leader_skipped_slots +=
1033+
leader_skipped_increment;
1034+
}
1035+
}
1036+
1037+
if slot_increment > 0 {
1038+
overall_slots_processed
1039+
.fetch_add(slot_increment, Ordering::Relaxed);
1040+
slots_since_stats
1041+
.fetch_add(slot_increment, Ordering::Relaxed);
1042+
}
1043+
if block_increment > 0 {
1044+
overall_blocks_processed
1045+
.fetch_add(1, Ordering::Relaxed);
1046+
blocks_since_stats.fetch_add(1, Ordering::Relaxed);
1047+
}
1048+
1049+
if let (Some(stats_tracking_tmp), Some(thread_stats_ref)) =
1050+
(&stats_tracking, &mut thread_stats)
1051+
{
1052+
if slot_increment > 0
1053+
&& slot % stats_tracking_tmp.tracking_interval_slots
1054+
== 0
10481055
{
1049-
blocks_since_stats
1050-
.fetch_sub(1, Ordering::Relaxed);
1051-
slots_since_stats.fetch_sub(
1052-
slot_increment,
1053-
Ordering::Relaxed,
1054-
);
1055-
overall_blocks_processed
1056-
.fetch_sub(1, Ordering::Relaxed);
1057-
overall_slots_processed.fetch_sub(
1058-
slot_increment,
1059-
Ordering::Relaxed,
1060-
);
1061-
if let Some((
1062-
prev_slots_processed,
1063-
prev_blocks_processed,
1064-
prev_leader_skipped,
1065-
prev_current_slot,
1066-
)) = thread_stats_snapshot
1056+
if let Err(err) = maybe_emit_stats(
1057+
stats_tracking.as_ref(),
1058+
thread_index,
1059+
thread_stats_ref,
1060+
&overall_slots_processed,
1061+
&overall_blocks_processed,
1062+
&overall_transactions_processed,
1063+
&overall_entries_processed,
1064+
&transactions_since_stats,
1065+
&blocks_since_stats,
1066+
&slots_since_stats,
1067+
&last_pulse,
1068+
start_time,
1069+
)
1070+
.await
10671071
{
1068-
thread_stats.slots_processed =
1069-
prev_slots_processed;
1070-
thread_stats.blocks_processed =
1071-
prev_blocks_processed;
1072-
thread_stats.leader_skipped_slots =
1073-
prev_leader_skipped;
1074-
thread_stats.current_slot = prev_current_slot;
1075-
} else {
1076-
thread_stats.slots_processed = thread_stats
1077-
.slots_processed
1078-
.saturating_sub(slot_increment);
1079-
thread_stats.blocks_processed = thread_stats
1080-
.blocks_processed
1081-
.saturating_sub(1);
1082-
thread_stats.leader_skipped_slots = thread_stats
1083-
.leader_skipped_slots
1084-
.saturating_sub(counted_leader_skips);
1072+
if block_increment > 0 {
1073+
blocks_since_stats
1074+
.fetch_sub(1, Ordering::Relaxed);
1075+
overall_blocks_processed
1076+
.fetch_sub(1, Ordering::Relaxed);
1077+
}
1078+
if slot_increment > 0 {
1079+
slots_since_stats.fetch_sub(
1080+
slot_increment,
1081+
Ordering::Relaxed,
1082+
);
1083+
overall_slots_processed.fetch_sub(
1084+
slot_increment,
1085+
Ordering::Relaxed,
1086+
);
1087+
}
1088+
if let Some((
1089+
prev_slots_processed,
1090+
prev_blocks_processed,
1091+
prev_leader_skipped,
1092+
prev_current_slot,
1093+
)) = thread_stats_snapshot
1094+
{
1095+
thread_stats_ref.slots_processed =
1096+
prev_slots_processed;
1097+
thread_stats_ref.blocks_processed =
1098+
prev_blocks_processed;
1099+
thread_stats_ref.leader_skipped_slots =
1100+
prev_leader_skipped;
1101+
thread_stats_ref.current_slot =
1102+
prev_current_slot;
1103+
} else {
1104+
if slot_increment > 0 {
1105+
thread_stats_ref.slots_processed =
1106+
thread_stats_ref
1107+
.slots_processed
1108+
.saturating_sub(slot_increment);
1109+
}
1110+
if block_increment > 0 {
1111+
thread_stats_ref.blocks_processed =
1112+
thread_stats_ref
1113+
.blocks_processed
1114+
.saturating_sub(block_increment);
1115+
}
1116+
if leader_skipped_increment > 0 {
1117+
thread_stats_ref.leader_skipped_slots =
1118+
thread_stats_ref
1119+
.leader_skipped_slots
1120+
.saturating_sub(
1121+
leader_skipped_increment,
1122+
);
1123+
}
1124+
}
1125+
last_counted_slot = prev_last_counted_slot;
1126+
return Err(err);
10851127
}
1086-
return Err(err);
10871128
}
10881129
}
10891130
}
1131+
1132+
if slot_increment > 0 {
1133+
last_counted_slot = slot;
1134+
}
10901135
}
10911136
Subset(_subset) => (),
10921137
Epoch(_epoch) => (),

0 commit comments

Comments
 (0)