Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
9b21327
Improve sigexists reading
gagliardetto Dec 3, 2025
4aed527
Improve compactindexsized reading
gagliardetto Dec 3, 2025
4713b2e
add rpc sanity check cli to test an old-faithful deployment
gagliardetto Dec 3, 2025
36a4e60
logs
gagliardetto Dec 3, 2025
ee08bfc
cleanup
gagliardetto Dec 3, 2025
5c3c96d
normalize output
gagliardetto Dec 3, 2025
67c6033
refactor output
gagliardetto Dec 3, 2025
07297c2
full sig
gagliardetto Dec 3, 2025
d19508a
add name
gagliardetto Dec 3, 2025
d458a60
sanity check: grpc load testing
gagliardetto Dec 3, 2025
30326c9
remove repeated work
gagliardetto Dec 3, 2025
9d88c49
cleanup
gagliardetto Dec 3, 2025
19f9358
fix
gagliardetto Dec 3, 2025
fa4c5f9
skip epochs flag
gagliardetto Dec 3, 2025
9fda7cb
clenaup
gagliardetto Dec 3, 2025
21816c2
first try getTx before getBlock on target
gagliardetto Dec 3, 2025
05b437f
fix error
gagliardetto Dec 3, 2025
a82fe0e
tx load
gagliardetto Dec 3, 2025
0a8062f
debug
gagliardetto Dec 3, 2025
d2f0041
zero
gagliardetto Dec 3, 2025
9a4b6f0
500k tx sample
gagliardetto Dec 3, 2025
db6c677
keep track of errors
gagliardetto Dec 3, 2025
1a7fb55
cleanup
gagliardetto Dec 3, 2025
caeeede
timeout
gagliardetto Dec 3, 2025
04d4626
rpc sanity check: add live dashboard
gagliardetto Dec 3, 2025
db07db0
disk stat collector
gagliardetto Dec 3, 2025
ee49423
add 0.025 to latency buckets
gagliardetto Dec 3, 2025
cad6d9d
add disk io to dashboard
gagliardetto Dec 3, 2025
eee371b
continuous tx harvesting
gagliardetto Dec 3, 2025
e953c77
continuous harvesting for fresh load testing
gagliardetto Dec 3, 2025
25e84f8
validate response signature
gagliardetto Dec 3, 2025
99ab02a
try different remote
gagliardetto Dec 3, 2025
00e3bb4
shuffle
gagliardetto Dec 3, 2025
dab1f5c
show config
gagliardetto Dec 3, 2025
26a30e3
add theme
gagliardetto Dec 3, 2025
722b38f
fix sig harvester
gagliardetto Dec 3, 2025
350a6a1
add net collector
gagliardetto Dec 3, 2025
44d5d61
network io
gagliardetto Dec 3, 2025
34a6baa
initial shuffle
gagliardetto Dec 3, 2025
a14f4e2
fix linter
gagliardetto Dec 4, 2025
2cfc5c2
flag: ignore compatible logs
gagliardetto Dec 4, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (multi *MultiEpoch) apiHandler(reqCtx *fasthttp.RequestCtx) {
return
}

epochHandler, err := multi.GetEpoch(uint64(epochNumber))
epochHandler, err := multi.GetEpoch(uint64(epochNumber.Epoch))
if err != nil {
reqCtx.SetStatusCode(fasthttp.StatusNotFound) // TODO: this means epoch is not available, and probably should be another dedicated status code
return
Expand Down
5 changes: 4 additions & 1 deletion bucketteer/bucketteer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,10 @@ func TestBucketteer(t *testing.T) {
mmr, err := mmap.Open(path)
require.NoError(t, err)
defer mmr.Close()
reader, err := NewReader(mmr)
stat, err := os.Stat(path)
require.NoError(t, err)
require.Equal(t, stat.Size(), int64(mmr.Len()))
reader, err := NewReader(mmr, stat.Size())
require.NoError(t, err)
ok, err := reader.Has(firstSig)
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion bucketteer/example/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func main() {
panic(err)
}
defer mmr.Close()
buRd, err := bucketteer.NewReader(mmr)
buRd, err := bucketteer.NewReader(mmr, int64(mmr.Len()))
if err != nil {
panic(err)
}
Expand Down
265 changes: 229 additions & 36 deletions bucketteer/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,26 @@ import (
"errors"
"fmt"
"io"
"log/slog"
"math"
"os"
"slices"
"time"
"unsafe"

bin "github.com/gagliardetto/binary"
"github.com/rpcpool/yellowstone-faithful/indexmeta"
"github.com/valyala/bytebufferpool"
"golang.org/x/exp/mmap"
"golang.org/x/sys/unix"
)

type Reader struct {
contentReader io.ReaderAt
meta *indexmeta.Meta
prefixToOffset *bucketToOffset
prefixToSize map[uint16]uint64
contentReader io.ReaderAt
meta *indexmeta.Meta
prefixToOffset *bucketToOffset
prefixToSize map[uint16]uint64
headerTotalSize int64 // Store this to calculate real file offset
}

type bucketToOffset [math.MaxUint16 + 1]uint64
Expand Down Expand Up @@ -64,7 +70,14 @@ func OpenMMAP(path string) (*Reader, error) {
if err != nil {
return nil, err
}
return NewReader(file)
stat, err := os.Stat(path)
if err != nil {
return nil, err
}
if stat.Size() == 0 {
return nil, fmt.Errorf("file is empty: %s", path)
}
return NewReader(file, stat.Size())
}

func Open(path string) (*Reader, error) {
Expand All @@ -79,7 +92,14 @@ func Open(path string) (*Reader, error) {
if err != nil {
return nil, err
}
return NewReader(file)
stat, err := file.Stat()
if err != nil {
return nil, err
}
if stat.Size() == 0 {
return nil, fmt.Errorf("file is empty: %s", path)
}
return NewReader(file, stat.Size())
}

func isEmptyFile(path string) (bool, error) {
Expand Down Expand Up @@ -110,7 +130,7 @@ func isReaderEmpty(reader io.ReaderAt) (bool, error) {
return len(buf) == 0, nil
}

func NewReader(reader io.ReaderAt) (*Reader, error) {
func NewReader(reader io.ReaderAt, fileSize int64) (*Reader, error) {
empty, err := isReaderEmpty(reader)
if err != nil {
return nil, fmt.Errorf("failed to check if reader is empty: %w", err)
Expand All @@ -128,7 +148,139 @@ func NewReader(reader io.ReaderAt) (*Reader, error) {
r.meta = meta
r.prefixToOffset = prefixToOffset
r.prefixToSize = calcSizeOfBuckets(*prefixToOffset)
r.contentReader = io.NewSectionReader(reader, headerTotalSize, 1<<63-1)
r.headerTotalSize = headerTotalSize
r.contentReader = io.NewSectionReader(reader, headerTotalSize, fileSize-headerTotalSize)

type fileDescriptor interface {
Fd() uintptr
Name() string
}
if f, ok := reader.(fileDescriptor); ok {
// fadvise random access pattern for the whole file
err := unix.Fadvise(int(f.Fd()), 0, 0, unix.FADV_RANDOM)
if err != nil {
slog.Warn("fadvise(RANDOM) failed", "error", err)
}
{
slog.Info("Warming up drives for bucket offsets (bucketteer)...", "file", f.Name())
startedWarmup := time.Now()
dummyBuf := make([]byte, 1)
warmedBuckets := 0
for _, offset := range *r.prefixToOffset {
if offset != math.MaxUint64 {
if _, err := r.contentReader.ReadAt(dummyBuf, int64(offset)); err != nil {
if !errors.Is(err, io.EOF) && !errors.Is(err, io.ErrUnexpectedEOF) {
slog.Warn("Cache warmup read failed", "offset", offset, "error", err)
}
}
warmedBuckets++
}
}
slog.Info(
"Drive warmup complete",
"buckets_warmed", warmedBuckets,
"duration", time.Since(startedWarmup).String(),
"file", f.Name(),
)
}
} else {
slog.Warn("Reader does not have an Fd(); cannot use posix_fadvise to manage cache.")
}

// if klog.V(4).Enabled() {
// // debug: print all prefixes and their offsets and sizes
// sizeSum := uint64(0)
// for prefix, offset := range *prefixToOffset {
// if offset == math.MaxUint64 {
// continue
// }
// prefixAsUint16 := uint16(prefix)
// size, ok := r.prefixToSize[prefixAsUint16]
// if !ok {
// continue
// }
// sizeSum += size
// }

// // try reading one random bucket andtime it
// startedReadAt := time.Now()
// prefix := uint16(0x1234)
// offset := r.prefixToOffset[prefix]
// if offset != math.MaxUint64 {
// size, ok := r.prefixToSize[prefix]
// if ok && size > 0 {
// bucketReader := io.NewSectionReader(r.contentReader, int64(offset)+4, int64(size-4))
// buf := make([]byte, size-4)
// _, err := bucketReader.Read(buf)
// if err != nil {
// return nil, fmt.Errorf("failed to read bucket for prefix %x: %w", uint16ToPrefix(prefix), err)
// }
// slog.Info(
// "debug_read_bucket",
// "prefix", uint16ToPrefix(prefix),
// "offset", offset,
// "size", size,
// "duration", time.Since(startedReadAt).String(),
// )
// }
// }
// latencies := make([]time.Duration, 0)
// for range 50 {
// // now do a search in the bucket
// sig := [64]byte{}
// rand.Read(sig[:])
// startedSearchAt := time.Now()
// found, err := r.Has(sig)
// if err != nil {
// return nil, fmt.Errorf("failed to search in bucket for prefix %x: %w", uint16ToPrefix(prefix), err)
// }
// dur := time.Since(startedSearchAt)
// slog.Info(
// "debug_search_bucket",
// "prefix", uint16ToPrefix(prefix),
// "found", found,
// "duration", dur.String(),
// )
// latencies = append(latencies, dur)
// }
// {
// sig := solana.MustSignatureFromBase58("2oSE6aiUGWCXUupFnYHgjofV8VSARaUepNDJ3vj2NCK2zFFUNNP6cinjy56vgGXD4WYrKWRkRFcvvC41TgRHM5ML")
// startedSearchAt := time.Now()
// found, err := r.Has(sig)
// if err != nil {
// return nil, fmt.Errorf("failed to search in bucket for prefix %x: %w", uint16ToPrefix(prefix), err)
// }
// dur := time.Since(startedSearchAt)
// slog.Info(
// "debug_search_bucket_known_sig",
// "prefix", uint16ToPrefix(prefix),
// "found", found,
// "duration", dur.String(),
// )
// if !found {
// return nil, fmt.Errorf("known signature not found in bucket for prefix %x", uint16ToPrefix(prefix))
// }
// }
// // porint all latencies
// totalLatency := time.Duration(0)
// for _, lat := range latencies {
// totalLatency += lat
// }
// avgLatency := totalLatency / time.Duration(len(latencies))
// slog.Info(
// "debug_search_bucket_summary",
// "prefix", uint16ToPrefix(prefix),
// "num_searches", len(latencies),
// "average_duration", avgLatency.String(),
// )
// for _, lat := range latencies {
// slog.Info(
// "debug_search_bucket_latency",
// "prefix", uint16ToPrefix(prefix),
// "duration", lat.String(),
// )
// }
// }
return r, nil
}

Expand Down Expand Up @@ -247,46 +399,87 @@ func readHeader(reader io.ReaderAt) (*bucketToOffset, *indexmeta.Meta, int64, er
}

func (r *Reader) Has(sig [64]byte) (bool, error) {
// start := time.Now()
prefix := [2]byte{sig[0], sig[1]}
offset := r.prefixToOffset[prefixToUint16(prefix)]
if offset == math.MaxUint64 {
// This prefix doesn't exist, so the signature can't.
return false, nil
}
size, ok := r.prefixToSize[prefixToUint16(prefix)]
if !ok || size < 4 {
return false, fmt.Errorf("invalid bucket size for prefix %x", prefix)
if !ok || size == 0 {
return false, nil
}
if size < 4 {
return false, fmt.Errorf("invalid bucket size (%v) for prefix %x", size, prefix)
}
sizeMinus4 := size - 4
numHashes := sizeMinus4 / 8
if numHashes == 0 {
// Empty bucket.
return false, nil
}
bucketReader := io.NewSectionReader(r.contentReader, int64(offset)+4, int64(numHashes*8))

// hashes:
wantedHash := Hash(sig)
got, err := searchEytzinger(0, int(numHashes), wantedHash, func(index int) (uint64, error) {
pos := int64(index * 8)
return readUint64Le(bucketReader, pos)
})
if err != nil {
if errors.Is(err, ErrNotFound) {
return false, nil
// if remainer, then size is invalid
if sizeMinus4%8 != 0 {
return false, fmt.Errorf("invalid bucket size for prefix %x: size minus 4 is not multiple of 8", prefix)
}
// slog.Info(
// "has_lookup_bucket_details",
// "prefix", prefix,
// "offset", offset,
// "size", size,
// "num_hashes", numHashes,
// "duration", time.Since(start).String(),
// )
// startSectionReaderGet := time.Now()
// bucketReader := r.sectionReaders[prefixToUint16(prefix)]
// bucketReader := io.NewSectionReader(r.contentReader, int64(offset)+4, int64(numHashes*8))
bucketReader := io.NewSectionReader(r.contentReader, int64(offset+4), int64(size-4))
{
// startReadWhole := time.Now()
wholeBucketBuf := bytebufferpool.Get()
defer bytebufferpool.Put(wholeBucketBuf)
wholeBucketBuf.Reset()
// wholeBucketBuf := make([]byte, sizeMinus4)
_, err := wholeBucketBuf.ReadFrom(bucketReader)
if err != nil {
return false, fmt.Errorf("failed to read whole bucket for prefix %x: %w", prefix, err)
}
return false, err
// tookReadWhole := time.Since(startReadWhole)
// create zero-copy []uint64 from wholeBucketBuf
hashes := unsafe.Slice((*uint64)(unsafe.Pointer(&wholeBucketBuf.B[0])), numHashes)
wantedHash := Hash(sig)
foundHash, err := searchEytzingerSlice(hashes, wantedHash)
// slog.Info(
// "has_lookup_bucket_search_whole",
// "prefix", prefix,
// "offset", offset,
// "size", size,
// "num_hashes", numHashes,
// "wanted_hash", wantedHash,
// "found_hash", foundHash,
// "duration", time.Since(start).String(),
// "duration_read_whole", tookReadWhole.String(),
// "duration_section_reader_get", time.Since(startSectionReaderGet).String(),
// )
if err != nil {
if errors.Is(err, ErrNotFound) {
return false, nil
}
return false, err
}
if foundHash == wantedHash {
return true, nil
}
return false, nil
}
return got == wantedHash, nil
}

func searchEytzinger(min int, max int, x uint64, getter func(int) (uint64, error)) (uint64, error) {
func searchEytzingerSlice(hashes []uint64, x uint64) (uint64, error) {
var index int
max := len(hashes)
for index < max {
k, err := getter(index)
if err != nil {
return 0, err
}
k := hashes[index]
if k == x {
return k, nil
}
Expand All @@ -298,13 +491,13 @@ func searchEytzinger(min int, max int, x uint64, getter func(int) (uint64, error
return 0, ErrNotFound
}

var ErrNotFound = fmt.Errorf("not found")

func readUint64Le(reader io.ReaderAt, pos int64) (uint64, error) {
buf := make([]byte, 8)
_, err := reader.ReadAt(buf, pos)
if err != nil {
return 0, err
func init() {
// panic if os is big endian
var i uint16 = 0x1
bs := (*[2]byte)(unsafe.Pointer(&i))
if bs[0] == 0 {
panic("big endian not supported")
}
return binary.LittleEndian.Uint64(buf), nil
}

var ErrNotFound = fmt.Errorf("not found")
Loading
Loading