Skip to content

Commit d1abcf2

Browse files
authored
Merge pull request #32 from notwedtm/fix/legacy-epoch-support
fix: full support for legacy epochs
2 parents da64e62 + 26173e6 commit d1abcf2

File tree

4 files changed

+186
-45
lines changed

4 files changed

+186
-45
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ thiserror = { version = "2", default-features = false }
3535
reqwest = { version = "0.12", default-features = false }
3636
anyhow = { version = "1", default-features = false }
3737
base64 = "0.22"
38+
bincode = "1"
3839
serial_test = "2"
3940
wincode = { version = "0.2", features = ["derive", "solana-short-vec"] }
4041
bs58 = { version = "0", default-features = false }

jetstreamer-firehose/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ fnv.workspace = true
5353
serde_json.workspace = true
5454
zstd.workspace = true
5555
solana-reward-info.workspace = true
56+
bincode.workspace = true
5657
xxhash-rust.workspace = true
5758
dashmap.workspace = true
5859
once_cell.workspace = true

jetstreamer-firehose/src/firehose.rs

Lines changed: 183 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ use crate::{
4646
// header, seeking, reading next block). Adjust here to tune stall detection/restart
4747
// aggressiveness.
4848
const OP_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30);
49+
// Epochs earlier than this were bincode-encoded in Old Faithful.
50+
const BINCODE_EPOCH_CUTOFF: u64 = 157;
4951

5052
fn poll_shutdown(
5153
flag: &Arc<std::sync::atomic::AtomicBool>,
@@ -344,6 +346,173 @@ fn clear_pending_skip(map: &DashMap<usize, DashSet<u64>>, thread_id: usize, slot
344346
.unwrap_or(false)
345347
}
346348

349+
fn decode_transaction_status_meta_from_frame(
350+
slot: u64,
351+
reassembled_metadata: Vec<u8>,
352+
) -> Result<solana_transaction_status::TransactionStatusMeta, SharedError> {
353+
if reassembled_metadata.is_empty() {
354+
// Early epochs often omit metadata entirely.
355+
return Ok(solana_transaction_status::TransactionStatusMeta::default());
356+
}
357+
358+
match utils::decompress_zstd(reassembled_metadata.clone()) {
359+
Ok(decompressed) => {
360+
decode_transaction_status_meta(slot, decompressed.as_slice()).map_err(|err| {
361+
Box::new(std::io::Error::other(format!(
362+
"decode transaction metadata (slot {slot}): {err}"
363+
))) as SharedError
364+
})
365+
}
366+
Err(decomp_err) => {
367+
// If the frame was not zstd-compressed (common for very early data), try to
368+
// decode the raw bytes directly before bailing.
369+
decode_transaction_status_meta(slot, reassembled_metadata.as_slice()).map_err(|err| {
370+
Box::new(std::io::Error::other(format!(
371+
"transaction metadata not zstd-compressed for slot {slot}; raw decode failed (raw_err={err}, decompress_err={decomp_err})"
372+
))) as SharedError
373+
})
374+
}
375+
}
376+
}
377+
378+
fn decode_transaction_status_meta(
379+
slot: u64,
380+
metadata_bytes: &[u8],
381+
) -> Result<solana_transaction_status::TransactionStatusMeta, SharedError> {
382+
let epoch = slot_to_epoch(slot);
383+
let mut bincode_err: Option<String> = None;
384+
if epoch < BINCODE_EPOCH_CUTOFF {
385+
match bincode::deserialize::<solana_storage_proto::StoredTransactionStatusMeta>(
386+
metadata_bytes,
387+
) {
388+
Ok(stored) => return Ok(stored.into()),
389+
Err(err) => {
390+
bincode_err = Some(err.to_string());
391+
}
392+
}
393+
}
394+
395+
let bin_err_for_proto = bincode_err.clone();
396+
let proto: solana_storage_proto::convert::generated::TransactionStatusMeta =
397+
prost_011::Message::decode(metadata_bytes).map_err(|err| {
398+
// If we already tried bincode, surface both failures for easier debugging.
399+
if let Some(ref bin_err) = bin_err_for_proto {
400+
Box::new(std::io::Error::other(format!(
401+
"protobuf decode transaction metadata failed (epoch {epoch}); bincode failed earlier: {bin_err}; protobuf error: {err}"
402+
))) as SharedError
403+
} else {
404+
Box::new(std::io::Error::other(format!(
405+
"protobuf decode transaction metadata: {err}"
406+
))) as SharedError
407+
}
408+
})?;
409+
410+
proto.try_into().map_err(|err| {
411+
if let Some(ref bin_err) = bincode_err {
412+
Box::new(std::io::Error::other(format!(
413+
"convert transaction metadata proto failed (epoch {epoch}); bincode failed earlier: {bin_err}; conversion error: {err}"
414+
))) as SharedError
415+
} else {
416+
Box::new(std::io::Error::other(format!(
417+
"convert transaction metadata proto: {err}"
418+
))) as SharedError
419+
}
420+
})
421+
}
422+
423+
#[cfg(test)]
424+
mod metadata_decode_tests {
425+
use super::{decode_transaction_status_meta, decode_transaction_status_meta_from_frame};
426+
use solana_message::v0::LoadedAddresses;
427+
use solana_storage_proto::StoredTransactionStatusMeta;
428+
use solana_transaction_status::TransactionStatusMeta;
429+
430+
fn sample_meta() -> TransactionStatusMeta {
431+
let mut meta = TransactionStatusMeta::default();
432+
meta.fee = 42;
433+
meta.pre_balances = vec![1, 2];
434+
meta.post_balances = vec![3, 4];
435+
meta.log_messages = Some(vec!["hello".into()]);
436+
meta.pre_token_balances = Some(Vec::new());
437+
meta.post_token_balances = Some(Vec::new());
438+
meta.rewards = Some(Vec::new());
439+
meta.compute_units_consumed = Some(7);
440+
meta.cost_units = Some(9);
441+
meta.loaded_addresses = LoadedAddresses::default();
442+
meta
443+
}
444+
445+
#[test]
446+
fn decodes_bincode_metadata_for_early_epochs() {
447+
let stored = StoredTransactionStatusMeta {
448+
status: Ok(()),
449+
fee: 42,
450+
pre_balances: vec![1, 2],
451+
post_balances: vec![3, 4],
452+
inner_instructions: None,
453+
log_messages: Some(vec!["hello".into()]),
454+
pre_token_balances: Some(Vec::new()),
455+
post_token_balances: Some(Vec::new()),
456+
rewards: Some(Vec::new()),
457+
return_data: None,
458+
compute_units_consumed: Some(7),
459+
cost_units: Some(9),
460+
};
461+
let bytes = bincode::serialize(&stored).expect("bincode serialize");
462+
let decoded = decode_transaction_status_meta(0, &bytes).expect("decode");
463+
assert_eq!(decoded, TransactionStatusMeta::from(stored));
464+
}
465+
466+
#[test]
467+
fn decodes_protobuf_metadata_for_later_epochs() {
468+
let meta = sample_meta();
469+
let generated: solana_storage_proto::convert::generated::TransactionStatusMeta =
470+
meta.clone().into();
471+
let bytes = prost_011::Message::encode_to_vec(&generated);
472+
let decoded = decode_transaction_status_meta(157 * 432000, &bytes).expect("decode");
473+
assert_eq!(decoded, meta);
474+
}
475+
476+
#[test]
477+
fn falls_back_to_proto_when_early_epoch_bytes_are_proto() {
478+
let meta = sample_meta();
479+
let generated: solana_storage_proto::convert::generated::TransactionStatusMeta =
480+
meta.clone().into();
481+
let bytes = prost_011::Message::encode_to_vec(&generated);
482+
// Epoch 100 should try bincode first; if those bytes are proto, we must fall back.
483+
let decoded = decode_transaction_status_meta(100 * 432000, &bytes).expect("decode");
484+
assert_eq!(decoded, meta);
485+
}
486+
487+
#[test]
488+
fn empty_frame_decodes_to_default() {
489+
let decoded = decode_transaction_status_meta_from_frame(0, Vec::new()).expect("decode");
490+
assert_eq!(decoded, TransactionStatusMeta::default());
491+
}
492+
493+
#[test]
494+
fn raw_bincode_frame_without_zstd_still_decodes() {
495+
let stored = StoredTransactionStatusMeta {
496+
status: Ok(()),
497+
fee: 1,
498+
pre_balances: vec![],
499+
post_balances: vec![],
500+
inner_instructions: None,
501+
log_messages: None,
502+
pre_token_balances: Some(Vec::new()),
503+
post_token_balances: Some(Vec::new()),
504+
rewards: Some(Vec::new()),
505+
return_data: None,
506+
compute_units_consumed: None,
507+
cost_units: None,
508+
};
509+
let raw_bytes = bincode::serialize(&stored).expect("serialize");
510+
let decoded =
511+
decode_transaction_status_meta_from_frame(0, raw_bytes).expect("decode fallback");
512+
assert_eq!(decoded, TransactionStatusMeta::from(stored));
513+
}
514+
}
515+
347516
/// Firehose transaction payload passed to [`Handler`] callbacks.
348517
#[derive(Debug, Clone)]
349518
pub struct TransactionData {
@@ -860,40 +1029,16 @@ where
8601029
)
8611030
})?;
8621031

863-
let decompressed =
864-
utils::decompress_zstd(reassembled_metadata.clone())
865-
.map_err(|err| {
866-
(
867-
FirehoseError::NodeDecodingError(
868-
item_index,
869-
err,
870-
),
871-
error_slot,
872-
)
873-
})?;
874-
875-
let metadata: solana_storage_proto::convert::generated::TransactionStatusMeta =
876-
prost_011::Message::decode(decompressed.as_slice())
877-
.map_err(|err| {
878-
(
879-
FirehoseError::NodeDecodingError(
880-
item_index,
881-
Box::new(err),
882-
),
883-
error_slot,
884-
)
885-
})?;
886-
887-
let as_native_metadata: solana_transaction_status::TransactionStatusMeta =
888-
metadata.try_into().map_err(|err| {
889-
(
890-
FirehoseError::NodeDecodingError(
891-
item_index,
892-
Box::new(err),
893-
),
894-
error_slot,
895-
)
896-
})?;
1032+
let as_native_metadata = decode_transaction_status_meta_from_frame(
1033+
block.slot,
1034+
reassembled_metadata,
1035+
)
1036+
.map_err(|err| {
1037+
(
1038+
FirehoseError::NodeDecodingError(item_index, err),
1039+
error_slot,
1040+
)
1041+
})?;
8971042

8981043
let message_hash = {
8991044
#[cfg(feature = "verify-transaction-signatures")]
@@ -1829,17 +1974,10 @@ async fn firehose_geyser_thread(
18291974
let versioned_tx = tx.as_parsed()?;
18301975
let reassembled_metadata = nodes.reassemble_dataframes(tx.metadata.clone())?;
18311976

1832-
let decompressed = utils::decompress_zstd(reassembled_metadata.clone())?;
1833-
1834-
let metadata: solana_storage_proto::convert::generated::TransactionStatusMeta =
1835-
prost_011::Message::decode(decompressed.as_slice()).map_err(|err| {
1836-
Box::new(std::io::Error::other(
1837-
std::format!("Error decoding metadata: {:?}", err),
1838-
))
1839-
})?;
1840-
1841-
let as_native_metadata: solana_transaction_status::TransactionStatusMeta =
1842-
metadata.try_into()?;
1977+
let as_native_metadata = decode_transaction_status_meta_from_frame(
1978+
block.slot,
1979+
reassembled_metadata,
1980+
)?;
18431981

18441982
let message_hash = {
18451983
#[cfg(feature = "verify-transaction-signatures")]

0 commit comments

Comments
 (0)