Skip to content

Commit 226c07c

Browse files
committed
rework index.rs to actually cache
1 parent 27a7527 commit 226c07c

File tree

1 file changed

+149
-50
lines changed

1 file changed

+149
-50
lines changed

jetstreamer-firehose/src/index.rs

Lines changed: 149 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ pub static SLOT_OFFSET_INDEX: Lazy<SlotOffsetIndex> = Lazy::new(|| {
8989

9090
static SLOT_OFFSET_RESULT_CACHE: Lazy<DashMap<u64, u64>> = Lazy::new(DashMap::new);
9191
static EPOCH_CACHE: Lazy<DashMap<EpochCacheKey, Arc<EpochEntry>>> = Lazy::new(DashMap::new);
92+
static INDEX_FILE_CACHE: Lazy<DashMap<String, Arc<IndexFileCacheEntry>>> = Lazy::new(DashMap::new);
9293

9394
#[derive(Clone, Hash, PartialEq, Eq)]
9495
struct EpochCacheKey {
@@ -128,6 +129,10 @@ struct EpochEntry {
128129
once: OnceCell<Arc<EpochIndex>>,
129130
}
130131

132+
struct IndexFileCacheEntry {
133+
once: OnceCell<Arc<Vec<u8>>>,
134+
}
135+
131136
struct EpochIndex {
132137
slot_range: Range<u64>,
133138
slot_index: RemoteCompactIndex,
@@ -154,6 +159,25 @@ impl EpochEntry {
154159
}
155160
}
156161

162+
impl IndexFileCacheEntry {
163+
fn new() -> Self {
164+
Self {
165+
once: OnceCell::new(),
166+
}
167+
}
168+
169+
async fn get_or_load<F, Fut>(&self, loader: F) -> Result<Arc<Vec<u8>>, SlotOffsetIndexError>
170+
where
171+
F: FnOnce() -> Fut,
172+
Fut: Future<Output = Result<Arc<Vec<u8>>, SlotOffsetIndexError>>,
173+
{
174+
self.once
175+
.get_or_try_init(|| async { loader().await })
176+
.await
177+
.map(Arc::clone)
178+
}
179+
}
180+
157181
impl EpochIndex {
158182
async fn offset_for_slot(&self, slot: u64) -> Result<u64, SlotOffsetIndexError> {
159183
if !self.slot_range.contains(&slot) {
@@ -318,6 +342,97 @@ impl SlotOffsetIndex {
318342
}
319343
}
320344

345+
async fn get_cached_index_bytes(
346+
client: &Client,
347+
url: &Url,
348+
kind_label: &str,
349+
) -> Result<Arc<Vec<u8>>, SlotOffsetIndexError> {
350+
let key = url.as_str().to_owned();
351+
let entry = match INDEX_FILE_CACHE.entry(key) {
352+
Entry::Occupied(occupied) => Arc::clone(occupied.get()),
353+
Entry::Vacant(vacant) => {
354+
let new_entry = Arc::new(IndexFileCacheEntry::new());
355+
vacant.insert(Arc::clone(&new_entry));
356+
new_entry
357+
}
358+
};
359+
360+
entry
361+
.get_or_load(|| async { download_full_index(client, url, kind_label).await })
362+
.await
363+
}
364+
365+
async fn download_full_index(
366+
client: &Client,
367+
url: &Url,
368+
kind_label: &str,
369+
) -> Result<Arc<Vec<u8>>, SlotOffsetIndexError> {
370+
let epoch_hint = url
371+
.path_segments()
372+
.and_then(|mut segments| segments.next_back())
373+
.and_then(|name| name.split('-').nth(1))
374+
.and_then(|value| value.parse::<u64>().ok());
375+
376+
if let Some(epoch) = epoch_hint {
377+
info!(
378+
target: LOG_MODULE,
379+
"Fetching {kind_label} compact index for epoch {epoch}"
380+
);
381+
} else {
382+
info!(target: LOG_MODULE, "Fetching {kind_label} compact index");
383+
}
384+
385+
let mut attempt = 0usize;
386+
loop {
387+
let response = client
388+
.get(url.clone())
389+
.send()
390+
.await
391+
.map_err(|err| SlotOffsetIndexError::NetworkError(url.clone(), err))?;
392+
393+
if response.status() == StatusCode::NOT_FOUND {
394+
return Err(SlotOffsetIndexError::EpochIndexFileNotFound(url.clone()));
395+
}
396+
397+
if response.status() == StatusCode::TOO_MANY_REQUESTS
398+
|| response.status() == StatusCode::SERVICE_UNAVAILABLE
399+
{
400+
if attempt < FETCH_RANGE_MAX_RETRIES {
401+
let delay_ms = FETCH_RANGE_BASE_DELAY_MS.saturating_mul(1u64 << attempt.min(10));
402+
warn!(
403+
target: LOG_MODULE,
404+
"HTTP {} fetching {}; retrying in {} ms (attempt {}/{})",
405+
response.status(),
406+
url,
407+
delay_ms,
408+
attempt + 1,
409+
FETCH_RANGE_MAX_RETRIES
410+
);
411+
sleep(Duration::from_millis(delay_ms)).await;
412+
attempt += 1;
413+
continue;
414+
}
415+
return Err(SlotOffsetIndexError::HttpStatusError(
416+
url.clone(),
417+
response.status(),
418+
));
419+
}
420+
421+
if !response.status().is_success() {
422+
return Err(SlotOffsetIndexError::HttpStatusError(
423+
url.clone(),
424+
response.status(),
425+
));
426+
}
427+
428+
let bytes = response
429+
.bytes()
430+
.await
431+
.map_err(|err| SlotOffsetIndexError::NetworkError(url.clone(), err))?;
432+
return Ok(Arc::new(bytes.to_vec()));
433+
}
434+
}
435+
321436
fn decode_offset_and_size(value: &[u8], url: &Url) -> Result<(u64, u32), SlotOffsetIndexError> {
322437
if value.len() != RemoteCompactIndex::OFFSET_AND_SIZE_VALUE_SIZE as usize {
323438
return Err(SlotOffsetIndexError::IndexFormatError(
@@ -340,10 +455,10 @@ fn decode_offset_and_size(value: &[u8], url: &Url) -> Result<(u64, u32), SlotOff
340455
}
341456

342457
struct RemoteCompactIndex {
343-
client: Client,
344458
url: Url,
345459
header: CompactIndexHeader,
346460
bucket_entries: DashMap<u32, Arc<BucketEntry>>,
461+
file_bytes: Arc<Vec<u8>>,
347462
}
348463

349464
impl RemoteCompactIndex {
@@ -356,30 +471,19 @@ impl RemoteCompactIndex {
356471
expected_value_size: Option<u64>,
357472
) -> Result<Self, SlotOffsetIndexError> {
358473
let kind_label = std::str::from_utf8(expected_kind).unwrap_or("index");
359-
let epoch_hint = url
360-
.path_segments()
361-
.and_then(|mut segments| segments.next_back())
362-
.and_then(|name| name.split('-').nth(1))
363-
.and_then(|value| value.parse::<u64>().ok());
364-
if let Some(epoch) = epoch_hint {
365-
info!(
366-
target: LOG_MODULE,
367-
"Fetching {kind_label} compact index for epoch {epoch}"
368-
);
369-
} else {
370-
info!(
371-
target: LOG_MODULE,
372-
"Fetching {kind_label} compact index"
373-
);
374-
}
375474

376-
let header =
377-
fetch_and_parse_header(&client, &url, expected_kind, expected_value_size).await?;
475+
let file_bytes = get_cached_index_bytes(&client, &url, kind_label).await?;
476+
let header = parse_compact_index_header(
477+
file_bytes.as_ref(),
478+
&url,
479+
expected_kind,
480+
expected_value_size,
481+
)?;
378482
Ok(Self {
379-
client,
380483
url,
381484
header,
382485
bucket_entries: DashMap::new(),
486+
file_bytes,
383487
})
384488
}
385489

@@ -424,39 +528,38 @@ impl RemoteCompactIndex {
424528
async fn load_bucket(&self, index: u32) -> Result<Arc<BucketData>, SlotOffsetIndexError> {
425529
let bucket_header_offset =
426530
self.header.header_size + (index as u64) * BUCKET_HEADER_SIZE as u64;
427-
let header_bytes = fetch_range(
428-
&self.client,
429-
&self.url,
430-
bucket_header_offset,
431-
bucket_header_offset + BUCKET_HEADER_SIZE as u64 - 1,
432-
true,
433-
)
434-
.await?;
435-
if header_bytes.len() != BUCKET_HEADER_SIZE {
531+
let header_end = bucket_header_offset.saturating_add(BUCKET_HEADER_SIZE as u64);
532+
if header_end > self.file_bytes.len() as u64 {
436533
return Err(SlotOffsetIndexError::IndexFormatError(
437534
self.url.clone(),
438535
format!(
439-
"expected {BUCKET_HEADER_SIZE} bucket header bytes, got {}",
440-
header_bytes.len()
536+
"bucket header {index} exceeds file bounds (offset {}, file len {})",
537+
bucket_header_offset,
538+
self.file_bytes.len()
441539
),
442540
));
443541
}
444-
let bucket_header = BucketHeader::from_bytes(header_bytes.try_into().unwrap());
542+
let header_offset = bucket_header_offset as usize;
543+
let mut header_buf = [0u8; BUCKET_HEADER_SIZE];
544+
header_buf
545+
.copy_from_slice(&self.file_bytes[header_offset..header_offset + BUCKET_HEADER_SIZE]);
546+
let bucket_header = BucketHeader::from_bytes(header_buf);
445547
let stride = bucket_header.hash_len as usize + self.header.value_size as usize;
446548
let data_len = stride * bucket_header.num_entries as usize;
447549
let data_start = bucket_header.file_offset;
448-
let data_end = data_start + data_len as u64 - 1;
449-
let data = fetch_range(&self.client, &self.url, data_start, data_end, true).await?;
450-
if data.len() != data_len {
550+
let data_end = data_start.saturating_add(data_len as u64);
551+
if data_end > self.file_bytes.len() as u64 {
451552
return Err(SlotOffsetIndexError::IndexFormatError(
452553
self.url.clone(),
453554
format!(
454-
"expected {} bytes of bucket data, got {}",
555+
"bucket data for index {index} exceeds file bounds (offset {}, len {}, file len {})",
556+
bucket_header.file_offset,
455557
data_len,
456-
data.len()
558+
self.file_bytes.len()
457559
),
458560
));
459561
}
562+
let data = self.file_bytes[data_start as usize..data_end as usize].to_vec();
460563

461564
Ok(Arc::new(BucketData::new(
462565
bucket_header,
@@ -616,13 +719,12 @@ fn hash_uint64(mut x: u64) -> u64 {
616719
x
617720
}
618721

619-
async fn fetch_and_parse_header(
620-
client: &Client,
722+
fn parse_compact_index_header(
723+
bytes: &[u8],
621724
url: &Url,
622725
expected_kind: &[u8],
623726
expected_value_size: Option<u64>,
624727
) -> Result<CompactIndexHeader, SlotOffsetIndexError> {
625-
let mut bytes = fetch_range(client, url, 0, HTTP_PREFETCH_BYTES - 1, false).await?;
626728
if bytes.len() < 12 {
627729
return Err(SlotOffsetIndexError::IndexFormatError(
628730
url.clone(),
@@ -638,16 +740,13 @@ async fn fetch_and_parse_header(
638740
let header_len = u32::from_le_bytes(bytes[8..12].try_into().unwrap()) as usize;
639741
let total_header_size = 8 + 4 + header_len;
640742
if bytes.len() < total_header_size {
641-
bytes = fetch_range(client, url, 0, total_header_size as u64 - 1, false).await?;
642-
if bytes.len() < total_header_size {
643-
return Err(SlotOffsetIndexError::IndexFormatError(
644-
url.clone(),
645-
format!(
646-
"incomplete index header: expected {total_header_size} bytes, got {}",
647-
bytes.len()
648-
),
649-
));
650-
}
743+
return Err(SlotOffsetIndexError::IndexFormatError(
744+
url.clone(),
745+
format!(
746+
"incomplete index header: expected {total_header_size} bytes, got {}",
747+
bytes.len()
748+
),
749+
));
651750
}
652751

653752
let value_size = u64::from_le_bytes(bytes[12..20].try_into().unwrap());

0 commit comments

Comments
 (0)