Skip to content

Commit b5a5dd8

Browse files
feat: add multiple program subscriptions (#160)
1 parent 43856e1 commit b5a5dd8

File tree

4 files changed

+44
-5
lines changed

4 files changed

+44
-5
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
target/
22
crates/mock/fixtures
33
*.env*
4+
.DS_Store
5+
**/.DS_Store
46
Vixen.toml

crates/proto/proto/stream.proto

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ package vixen.stream;
88
// updates for a given program ID.
99
service ProgramStreams {
1010
rpc Subscribe(SubscribeRequest) returns (stream SubscribeUpdate) {}
11+
rpc SubscribeMany(SubscribeManyRequest) returns (stream SubscribeUpdate) {}
1112
}
1213

1314
// Request to subscribe to a stream of updates for a given program ID.
@@ -16,6 +17,12 @@ message SubscribeRequest {
1617
string program = 1;
1718
}
1819

20+
// Request to subscribe to a stream of updates for a given program IDs.
21+
message SubscribeManyRequest {
22+
// The program IDs to subscribe to.
23+
repeated string programs = 1;
24+
}
25+
1926
// Update from the requested program containing a parsed value.
2027
message SubscribeUpdate {
2128
// The parsed value.

crates/runtime/src/handler.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use std::{borrow::Cow, collections::HashMap, pin::Pin};
55

66
use futures_util::{Future, FutureExt, StreamExt};
77
use smallvec::SmallVec;
8-
use tracing::{warn, Instrument, Span};
8+
use tracing::{trace, Instrument, Span};
99
use vixen_core::{
1010
AccountUpdate, BlockMetaUpdate, BlockUpdate, GetPrefilter, ParserId, SlotUpdate,
1111
TransactionUpdate,
@@ -95,9 +95,9 @@ mod pipeline_error {
9595

9696
#[derive(Debug, thiserror::Error)]
9797
pub enum Error {
98-
#[error("Error parsing input value")]
98+
#[error("Error parsing input value: ({0})")]
9999
Parser(#[source] BoxedError),
100-
#[error("Handler returned an error on parsed value")]
100+
#[error("Handler returned an error on parsed value: ({0})")]
101101
Handler(#[source] BoxedError),
102102
}
103103

@@ -320,7 +320,7 @@ where I::Item: AsRef<str> + Send + 'm
320320
let pipeline = pipelines.0.get(filter);
321321

322322
if pipeline.is_none() {
323-
warn!(filter, "No pipeline matched filter on incoming update");
323+
trace!(filter, "No pipeline matched filter on incoming update");
324324
}
325325

326326
pipeline.map(|p| (f, p))

crates/stream/src/grpc.rs

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use yellowstone_vixen_proto::{
1313
stream::{
1414
self,
1515
program_streams_server::{ProgramStreams, ProgramStreamsServer},
16-
SubscribeRequest, SubscribeUpdate,
16+
SubscribeManyRequest, SubscribeRequest, SubscribeUpdate,
1717
},
1818
tonic::{self, transport, Request, Response, Status},
1919
tonic_reflection,
@@ -46,6 +46,7 @@ pub(super) struct Service(Channels);
4646

4747
#[tonic::async_trait]
4848
impl ProgramStreams for Service {
49+
type SubscribeManyStream = futures_util::stream::SelectAll<ReceiverStream>;
4950
type SubscribeStream = futures_util::stream::SelectAll<ReceiverStream>;
5051

5152
async fn subscribe(
@@ -67,6 +68,35 @@ impl ProgramStreams for Service {
6768

6869
Ok(Response::new(stream))
6970
}
71+
72+
async fn subscribe_many(
73+
&self,
74+
request: Request<SubscribeManyRequest>,
75+
) -> Result<Response<Self::SubscribeManyStream>, Status> {
76+
let pubkeys: Vec<Pubkey> = request
77+
.into_inner()
78+
.programs
79+
.iter()
80+
.map(|p| {
81+
p.parse()
82+
.map_err(|e: yellowstone_vixen_core::KeyFromStrError| {
83+
Status::new(tonic::Code::InvalidArgument, e.to_string())
84+
})
85+
})
86+
.collect::<Result<Vec<_>, _>>()?;
87+
88+
let rxs: Vec<_> = pubkeys
89+
.iter()
90+
.filter_map(|key| self.0.get(key))
91+
.flatten()
92+
.collect();
93+
94+
// TODO: make max_tries configurable?
95+
let stream =
96+
futures_util::stream::select_all(rxs.iter().map(|rx| ReceiverStream::new(rx, 8)));
97+
98+
Ok(Response::new(stream))
99+
}
70100
}
71101

72102
type BoxedRx = Box<Receiver>;

0 commit comments

Comments
 (0)