Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/plugin/sample_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
"GovER5Lthms3bLBqWub97yVrMmEogzX7xNjdXpPPCVZw"
],
"pubkeys": [],
"mints": [],
"startup": false
},
"instructions": {
Expand Down
3 changes: 3 additions & 0 deletions crates/plugin/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ pub struct Accounts {
#[serde(default)]
pub pubkeys: HashSet<String>,

#[serde(default)]
pub mints: HashSet<String>,

/// Filter for changing how to interpret the `is_startup` flag.
///
/// This option has three states:
Expand Down
4 changes: 2 additions & 2 deletions crates/plugin/src/plugin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,8 @@ impl GeyserPlugin for GeyserPluginRabbitMq {
.await
.map_err(custom_err(&metrics.errs))?;

if acct_sel.screen_tokens() {
acct_sel.init_tokens(
if acct_sel.screen_token_registry() {
acct_sel.init_token_registry(
Self::load_token_reg()
.await
.map_err(custom_err(&metrics.errs))?,
Expand Down
83 changes: 60 additions & 23 deletions crates/plugin/src/selectors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@ use crate::{
pub struct AccountSelector {
owners: HashSet<[u8; 32]>,
pubkeys: HashSet<[u8; 32]>,
mints: HashSet<Pubkey>,
startup: Option<bool>,
token_addresses: Option<HashSet<Pubkey>>,
token_reg: Option<HashSet<Pubkey>>,
}

impl AccountSelector {
Expand All @@ -24,6 +25,7 @@ impl AccountSelector {
owners,
all_tokens,
pubkeys,
mints,
startup,
} = config;

Expand All @@ -39,23 +41,37 @@ impl AccountSelector {
.collect::<Result<_, _>>()
.context("Failed to parse account pubkeys")?;

Ok(Self {
let mints = mints
.into_iter()
.map(|s| s.parse())
.collect::<Result<_, _>>()
.context("Failed to parse token account mint addresses")?;

let mut ret = Self {
owners,
pubkeys,
mints,
startup,
token_addresses: if all_tokens {
token_reg: if all_tokens {
None
} else {
Some(HashSet::new())
},
})
};

// Don't screen tokens if we're never going to return them
if !ret.owners.contains(TOKEN_KEY.as_ref()) {
ret.token_reg = None;
}

Ok(ret)
}

/// Lazy-load the token addresses. Fails if token addresses are not wanted
/// or if they have already been loaded.
pub fn init_tokens(&mut self, addrs: HashSet<Pubkey>) {
assert!(self.token_addresses.as_ref().unwrap().is_empty());
self.token_addresses = Some(addrs);
pub fn init_token_registry(&mut self, addrs: HashSet<Pubkey>) {
assert!(self.token_reg.as_ref().unwrap().is_empty());
self.token_reg = Some(addrs);
}

#[inline]
Expand All @@ -64,30 +80,44 @@ impl AccountSelector {
}

#[inline]
pub fn screen_tokens(&self) -> bool {
self.token_addresses.is_some()
pub fn screen_token_registry(&self) -> bool {
self.token_reg.is_some()
}

#[inline]
pub fn is_selected(&self, acct: &ReplicaAccountInfo, is_startup: bool) -> bool {
let ReplicaAccountInfo { owner, data, .. } = *acct;

if self.startup.map_or(false, |s| is_startup != s)
|| !(self.owners.contains(owner) || self.pubkeys.contains(acct.pubkey))
{
if self.startup.map_or(false, |s| is_startup != s) {
return false;
}

if owner == TOKEN_KEY.as_ref() && data.len() == TokenAccount::get_packed_len() {
if let Some(ref addrs) = self.token_addresses {
let token_account = TokenAccount::unpack_from_slice(data);
if self.pubkeys.contains(acct.pubkey) {
return true;
}

if let Ok(token_account) = token_account {
if token_account.amount > 1 || addrs.contains(&token_account.mint) {
return false;
}
}
}
let token = if (self.token_reg.is_some() || !self.mints.is_empty())
&& owner == TOKEN_KEY.as_ref()
&& data.len() == TokenAccount::get_packed_len()
{
TokenAccount::unpack_from_slice(data).ok()
} else {
None
};

if token.map_or(false, |t| self.mints.contains(&t.mint)) {
return true;
}

if !self.owners.contains(owner) {
return false;
}

if token
.zip(self.token_reg.as_ref())
.map_or(false, |(t, r)| t.amount > 1 || r.contains(&t.mint))
{
return false;
}

true
Expand All @@ -113,10 +143,17 @@ impl InstructionSelector {
.collect::<Result<_, _>>()
.context("Failed to parse instruction program keys")?;

Ok(Self {
let mut ret = Self {
programs,
screen_tokens: !all_token_calls,
})
};

// Don't screen token calls if we're never going to return them
if !ret.programs.contains(&TOKEN_KEY) {
ret.screen_tokens = false;
}

Ok(ret)
}

#[inline]
Expand Down
8 changes: 4 additions & 4 deletions crates/rabbitmq/src/geyser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,10 @@ impl QueueType {
queue,
binding: Binding::Fanout,
prefetch: 4096,
max_len_bytes: if suffix.is_debug() || matches!(startup_type, StartupType::Normal) {
512 * 1024 * 1024 // 512 MiB
} else {
12 * 1024 * 1024 * 1024 // 12 GiB
max_len_bytes: match (suffix.is_debug(), startup_type) {
(true, _) => 100 * 1024 * 1024, // 100 MiB
(false, StartupType::Normal) => 4 * 1024 * 1024 * 1024, // 4 GiB
(false, _) => 12 * 1024 * 1024 * 1024, // 12 GiB
},
auto_delete: suffix.is_debug(),
retry: Some(RetryProps {
Expand Down