Skip to content

Commit 506d4ed

Browse files
committed
try again
1 parent af71e09 commit 506d4ed

File tree

2 files changed

+344
-139
lines changed

2 files changed

+344
-139
lines changed

block_old.txt

Lines changed: 210 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,210 @@
1+
Block(block) => {
2+
let prev_last_counted_slot = last_counted_slot;
3+
let thread_stats_snapshot = thread_stats.as_ref().map(|stats| {
4+
(
5+
stats.slots_processed,
6+
stats.blocks_processed,
7+
stats.leader_skipped_slots,
8+
stats.current_slot,
9+
)
10+
});
11+
if let Some(previous_slot) = previous_slot {
12+
for skipped_slot in (previous_slot + 1)..slot {
13+
log::debug!(
14+
target: &log_target,
15+
"leader skipped slot {} (previous slot {}, current slot {})",
16+
skipped_slot,
17+
previous_slot,
18+
slot,
19+
);
20+
if block_enabled {
21+
if let Some(on_block_cb) = on_block.as_ref() {
22+
on_block_cb(
23+
thread_index,
24+
BlockData::LeaderSkipped {
25+
slot: skipped_slot,
26+
},
27+
)
28+
.await
29+
.map_err(|e| {
30+
(
31+
FirehoseError::BlockHandlerError(e),
32+
error_slot,
33+
)
34+
})?;
35+
}
36+
}
37+
if tracking_enabled {
38+
let delta = skipped_slot
39+
.saturating_sub(last_counted_slot);
40+
if delta > 0 {
41+
overall_slots_processed
42+
.fetch_add(delta, Ordering::Relaxed);
43+
slots_since_stats
44+
.fetch_add(delta, Ordering::Relaxed);
45+
if let Some(ref mut stats) = thread_stats {
46+
stats.leader_skipped_slots += delta;
47+
stats.slots_processed += delta;
48+
stats.current_slot = skipped_slot;
49+
}
50+
last_counted_slot = skipped_slot;
51+
}
52+
}
53+
}
54+
}
55+
56+
if block_enabled {
57+
if let Some(on_block_cb) = on_block.as_ref() {
58+
let keyed_rewards = std::mem::take(&mut this_block_rewards);
59+
on_block_cb(
60+
thread_index,
61+
BlockData::Block {
62+
parent_slot: block.meta.parent_slot,
63+
parent_blockhash: previous_blockhash,
64+
slot: block.slot,
65+
blockhash: latest_entry_blockhash,
66+
rewards: KeyedRewardsAndNumPartitions {
67+
keyed_rewards,
68+
num_partitions: None,
69+
},
70+
block_time: Some(block.meta.blocktime as i64),
71+
block_height: block.meta.block_height,
72+
executed_transaction_count:
73+
this_block_executed_transaction_count,
74+
entry_count: this_block_entry_count,
75+
},
76+
)
77+
.await
78+
.map_err(|e| {
79+
(
80+
FirehoseError::BlockHandlerError(e),
81+
error_slot,
82+
)
83+
})?;
84+
}
85+
} else {
86+
this_block_rewards.clear();
87+
}
88+
previous_blockhash = latest_entry_blockhash;
89+
let last_slot_before_block = last_counted_slot;
90+
if tracking_enabled {
91+
let block_slot_delta =
92+
slot.saturating_sub(last_slot_before_block);
93+
if block_slot_delta > 0 {
94+
overall_slots_processed
95+
.fetch_add(block_slot_delta, Ordering::Relaxed);
96+
slots_since_stats
97+
.fetch_add(block_slot_delta, Ordering::Relaxed);
98+
if let Some(ref mut stats) = thread_stats {
99+
stats.slots_processed += block_slot_delta;
100+
stats.current_slot = slot;
101+
}
102+
last_counted_slot = slot;
103+
}
104+
if slot > prev_last_counted_slot {
105+
overall_blocks_processed
106+
.fetch_add(1, Ordering::Relaxed);
107+
blocks_since_stats.fetch_add(1, Ordering::Relaxed);
108+
if let Some(ref mut stats) = thread_stats {
109+
stats.blocks_processed += 1;
110+
}
111+
}
112+
}
113+
114+
let slot_increment =
115+
last_counted_slot.saturating_sub(prev_last_counted_slot);
116+
let block_increment =
117+
if tracking_enabled && slot > prev_last_counted_slot {
118+
1
119+
} else {
120+
0
121+
};
122+
let leader_skipped_increment =
123+
slot_increment.saturating_sub(block_increment);
124+
125+
if tracking_enabled {
126+
if let (Some(stats_tracking_tmp), Some(thread_stats_ref)) =
127+
(&stats_tracking, &mut thread_stats)
128+
{
129+
if slot_increment > 0
130+
&& slot % stats_tracking_tmp.tracking_interval_slots
131+
== 0
132+
{
133+
if let Err(err) = maybe_emit_stats(
134+
stats_tracking.as_ref(),
135+
thread_index,
136+
thread_stats_ref,
137+
&overall_slots_processed,
138+
&overall_blocks_processed,
139+
&overall_transactions_processed,
140+
&overall_entries_processed,
141+
&transactions_since_stats,
142+
&blocks_since_stats,
143+
&slots_since_stats,
144+
&last_pulse,
145+
start_time,
146+
)
147+
.await
148+
{
149+
if block_increment > 0 {
150+
blocks_since_stats
151+
.fetch_sub(1, Ordering::Relaxed);
152+
overall_blocks_processed
153+
.fetch_sub(1, Ordering::Relaxed);
154+
}
155+
if slot_increment > 0 {
156+
slots_since_stats.fetch_sub(
157+
slot_increment,
158+
Ordering::Relaxed,
159+
);
160+
overall_slots_processed.fetch_sub(
161+
slot_increment,
162+
Ordering::Relaxed,
163+
);
164+
}
165+
if let Some((
166+
prev_slots_processed,
167+
prev_blocks_processed,
168+
prev_leader_skipped,
169+
prev_current_slot,
170+
)) = thread_stats_snapshot
171+
{
172+
thread_stats_ref.slots_processed =
173+
prev_slots_processed;
174+
thread_stats_ref.blocks_processed =
175+
prev_blocks_processed;
176+
thread_stats_ref.leader_skipped_slots =
177+
prev_leader_skipped;
178+
thread_stats_ref.current_slot =
179+
prev_current_slot;
180+
} else {
181+
if slot_increment > 0 {
182+
thread_stats_ref.slots_processed =
183+
thread_stats_ref
184+
.slots_processed
185+
.saturating_sub(slot_increment);
186+
thread_stats_ref.current_slot =
187+
prev_last_counted_slot;
188+
}
189+
if block_increment > 0 {
190+
thread_stats_ref.blocks_processed =
191+
thread_stats_ref
192+
.blocks_processed
193+
.saturating_sub(block_increment);
194+
}
195+
if leader_skipped_increment > 0 {
196+
thread_stats_ref.leader_skipped_slots =
197+
thread_stats_ref
198+
.leader_skipped_slots
199+
.saturating_sub(
200+
leader_skipped_increment,
201+
);
202+
}
203+
}
204+
last_counted_slot = prev_last_counted_slot;
205+
return Err(err);
206+
}
207+
}
208+
}
209+
}
210+
}

0 commit comments

Comments
 (0)