Skip to content

Commit 56b0ef2

Browse files
authored
Merge pull request #23 from holaplex/dev
v0.6 Release
2 parents c33467e + 5b66df7 commit 56b0ef2

File tree

11 files changed

+470
-680
lines changed

11 files changed

+470
-680
lines changed

Cargo.lock

Lines changed: 351 additions & 661 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/plugin/Cargo.toml

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "holaplex-indexer-rabbitmq-geyser"
3-
version = "0.5.0"
3+
version = "0.6.0"
44
authors = [
55
"ryans <[email protected]>",
66
]
@@ -28,14 +28,14 @@ parking_lot = "0.12.0"
2828
reqwest = "0.11.6"
2929
serde = { version = "1.0.133", features = ["derive"] }
3030
serde_json = "1.0.75"
31-
serde_with = "1.11.0"
31+
serde_with = "2.0.0"
3232
tokio-executor-trait = "2.1.0"
3333
tokio-reactor-trait = "1.1.0"
3434

35-
solana-geyser-plugin-interface = "1.10.25"
36-
solana-logger = "1.10.25"
37-
solana-program = "1.10.25"
38-
solana-metrics = "1.10.25"
35+
solana-geyser-plugin-interface = "~1.10.25"
36+
solana-logger = "~1.10.25"
37+
solana-program = "~1.10.25"
38+
solana-metrics = "~1.10.25"
3939
spl-token = "3.3.0"
4040

4141
[dependencies.tokio]
@@ -51,7 +51,7 @@ features = [
5151

5252
[dependencies.indexer-rabbitmq]
5353
package = "holaplex-indexer-rabbitmq"
54-
version = "=0.2.0"
54+
version = "=0.3.0"
5555
path = "../rabbitmq"
5656
default-features = false
5757
features = ["producer", "geyser"]

crates/plugin/sample_config.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
"HAbiTatJVqoCJd9asyr6RxMEdwtfrQugwp7VAFyKWb1g",
3333
"GovER5Lthms3bLBqWub97yVrMmEogzX7xNjdXpPPCVZw"
3434
],
35+
"pubkeys": [],
3536
"startup": false
3637
},
3738
"instructions": {

crates/plugin/src/config.rs

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use crate::{
77
};
88

99
#[derive(Debug, Deserialize)]
10-
#[serde(rename_all = "camelCase")]
10+
#[serde(rename_all = "camelCase", deny_unknown_fields)]
1111
pub struct Config {
1212
amqp: Amqp,
1313
jobs: Jobs,
@@ -17,11 +17,15 @@ pub struct Config {
1717

1818
accounts: Accounts,
1919
instructions: Instructions,
20+
21+
/// Unused but required by the validator to load the plugin
22+
#[allow(dead_code)]
23+
libpath: String,
2024
}
2125

2226
#[serde_with::serde_as]
2327
#[derive(Debug, Deserialize)]
24-
#[serde(rename_all = "camelCase")]
28+
#[serde(rename_all = "camelCase", deny_unknown_fields)]
2529
pub struct Amqp {
2630
pub address: String,
2731

@@ -30,7 +34,7 @@ pub struct Amqp {
3034
}
3135

3236
#[derive(Debug, Deserialize)]
33-
#[serde(rename_all = "camelCase")]
37+
#[serde(rename_all = "camelCase", deny_unknown_fields)]
3438
pub struct Jobs {
3539
pub limit: usize,
3640

@@ -39,22 +43,27 @@ pub struct Jobs {
3943
}
4044

4145
#[derive(Debug, Default, Deserialize)]
42-
#[serde(rename_all = "camelCase")]
46+
#[serde(rename_all = "camelCase", deny_unknown_fields)]
4347
pub struct Metrics {
4448
pub config: Option<String>,
4549
}
4650

4751
#[derive(Debug, Deserialize)]
4852
#[serde(rename_all = "camelCase", deny_unknown_fields)]
4953
pub struct Accounts {
54+
#[serde(default)]
5055
pub owners: HashSet<String>,
5156

57+
#[serde(default)]
58+
pub pubkeys: HashSet<String>,
59+
5260
/// Filter for changing how to interpret the `is_startup` flag.
5361
///
5462
/// This option has three states:
5563
/// - `None`: Ignore the `is_startup` flag and send all updates.
5664
/// - `Some(true)`: Only send updates when `is_startup` is `true`.
5765
/// - `Some(false)`: Only send updates when `is_startup` is `false`.
66+
#[serde(default)]
5867
pub startup: Option<bool>,
5968

6069
/// Set to true to disable heuristics to reduce the number of incoming
@@ -67,6 +76,7 @@ pub struct Accounts {
6776
#[derive(Debug, Deserialize)]
6877
#[serde(rename_all = "camelCase", deny_unknown_fields)]
6978
pub struct Instructions {
79+
#[serde(default)]
7080
pub programs: HashSet<String>,
7181

7282
/// Set to true to disable heuristics to reduce the number of incoming
@@ -93,6 +103,7 @@ impl Config {
93103
metrics,
94104
accounts,
95105
instructions,
106+
libpath: _,
96107
} = self;
97108

98109
let acct =

crates/plugin/src/selectors.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use crate::{
1313
#[derive(Debug)]
1414
pub struct AccountSelector {
1515
owners: HashSet<[u8; 32]>,
16+
pubkeys: HashSet<[u8; 32]>,
1617
startup: Option<bool>,
1718
token_addresses: Option<HashSet<Pubkey>>,
1819
}
@@ -22,6 +23,7 @@ impl AccountSelector {
2223
let Accounts {
2324
owners,
2425
all_tokens,
26+
pubkeys,
2527
startup,
2628
} = config;
2729

@@ -31,8 +33,15 @@ impl AccountSelector {
3133
.collect::<Result<_, _>>()
3234
.context("Failed to parse account owner keys")?;
3335

36+
let pubkeys = pubkeys
37+
.into_iter()
38+
.map(|s| s.parse().map(Pubkey::to_bytes))
39+
.collect::<Result<_, _>>()
40+
.context("Failed to parse account pubkeys")?;
41+
3442
Ok(Self {
3543
owners,
44+
pubkeys,
3645
startup,
3746
token_addresses: if all_tokens {
3847
None
@@ -63,7 +72,9 @@ impl AccountSelector {
6372
pub fn is_selected(&self, acct: &ReplicaAccountInfo, is_startup: bool) -> bool {
6473
let ReplicaAccountInfo { owner, data, .. } = *acct;
6574

66-
if self.startup.map_or(false, |s| is_startup != s) || !self.owners.contains(owner) {
75+
if self.startup.map_or(false, |s| is_startup != s)
76+
|| !(self.owners.contains(owner) || self.pubkeys.contains(acct.pubkey))
77+
{
6778
return false;
6879
}
6980

crates/rabbitmq/Cargo.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "holaplex-indexer-rabbitmq"
3-
version = "0.2.0"
3+
version = "0.3.0"
44
authors = [
55
"ryans <[email protected]>",
66
]
@@ -18,6 +18,7 @@ consumer = ["suffix"]
1818
default = ["consumer"]
1919
geyser = ["solana-program", "suffix"]
2020
http-indexer = ["solana-program", "suffix"]
21+
job-runner = []
2122
producer = ["suffix"]
2223
search-indexer = ["serde_json", "solana-program", "suffix"]
2324
suffix = ["clap"]
@@ -33,5 +34,5 @@ serde = { version = "1.0.133", features = ["derive"] }
3334
serde_json = { version = "1.0.79", optional = true }
3435
# TODO: tighten this bound once more things support 1.10
3536
solana-program = { version = ">=1.9", optional = true }
36-
strum = { version = "0.23.0", features = ["derive"] }
37+
strum = { version = "0.24.1", features = ["derive"] }
3738
thiserror = "1.0.30"

crates/rabbitmq/src/geyser.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,9 +125,9 @@ impl QueueType {
125125
binding: Binding::Fanout,
126126
prefetch: 4096,
127127
max_len_bytes: if suffix.is_debug() || matches!(startup_type, StartupType::Normal) {
128-
100 * 1024 * 1024 // 100 MiB
128+
512 * 1024 * 1024 // 512 MiB
129129
} else {
130-
8 * 1024 * 1024 * 1024 // 8 GiB
130+
12 * 1024 * 1024 * 1024 // 12 GiB
131131
},
132132
auto_delete: suffix.is_debug(),
133133
retry: Some(RetryProps {

crates/rabbitmq/src/http_indexer.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@ pub trait Entity: std::fmt::Debug + Serialize + for<'a> Deserialize<'a> {
4747

4848
/// A name to use when declaring queues and exchanges
4949
const ID: Self::Id;
50+
51+
/// The maximum queue length to use for this entity
52+
const MAX_LEN_BYTES: i64;
5053
}
5154

5255
/// Fetch the off-chain JSON for a metadata account
@@ -67,6 +70,8 @@ impl Entity for MetadataJson {
6770
type Id = EntityId;
6871

6972
const ID: EntityId = EntityId::MetadataJson;
73+
74+
const MAX_LEN_BYTES: i64 = 512 * 1024 * 1024; // 512 MiB
7075
}
7176

7277
/// Fetch the off-chain JSON config for a storefront
@@ -82,6 +87,8 @@ impl Entity for StoreConfig {
8287
type Id = EntityId;
8388

8489
const ID: EntityId = EntityId::StoreConfig;
90+
91+
const MAX_LEN_BYTES: i64 = 100 * 1024 * 1024; // 100 MiB
8592
}
8693

8794
impl<E: Entity> QueueType<E> {
@@ -100,7 +107,7 @@ impl<E: Entity> QueueType<E> {
100107
queue,
101108
binding: Binding::Fanout,
102109
prefetch: 1024,
103-
max_len_bytes: 100 * 1024 * 1024, // 100 MiB
110+
max_len_bytes: E::MAX_LEN_BYTES,
104111
auto_delete: suffix.is_debug(),
105112
retry: Some(RetryProps {
106113
max_tries: 8,

crates/rabbitmq/src/job_runner.rs

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
//! Queue configuration for dispatching background jobs.
2+
3+
use std::time::Duration;
4+
5+
use serde::{Deserialize, Serialize};
6+
7+
use crate::{
8+
queue_type::{Binding, QueueProps, RetryProps},
9+
suffix::Suffix,
10+
Result,
11+
};
12+
13+
/// Message data for a job dispatch request
14+
#[derive(Debug, Clone, Serialize, Deserialize)]
15+
pub enum Message {
16+
/// Refresh a table of cached data
17+
RefreshTable(String),
18+
}
19+
20+
/// AMQP configuration for job runners
21+
#[derive(Debug, Clone)]
22+
pub struct QueueType {
23+
props: QueueProps,
24+
}
25+
26+
impl QueueType {
27+
/// Construct a new queue configuration given the expected sender and queue
28+
/// suffix configuration
29+
///
30+
/// # Errors
31+
/// This function fails if the given queue suffix is invalid.
32+
pub fn new(sender: &str, suffix: &Suffix) -> Result<Self> {
33+
let exchange = format!("{}.jobs", sender);
34+
let queue = suffix.format(format!("{}.runner", exchange))?;
35+
36+
Ok(Self {
37+
props: QueueProps {
38+
exchange,
39+
queue,
40+
binding: Binding::Fanout,
41+
prefetch: 1,
42+
max_len_bytes: 100 * 1024 * 1024, // 100 MiB
43+
auto_delete: suffix.is_debug(),
44+
retry: Some(RetryProps {
45+
max_tries: 5,
46+
delay_hint: Duration::from_secs(5),
47+
max_delay: Duration::from_secs(10 * 60),
48+
}),
49+
},
50+
})
51+
}
52+
}
53+
54+
impl crate::QueueType for QueueType {
55+
type Message = Message;
56+
57+
#[inline]
58+
fn info(&self) -> crate::queue_type::QueueInfo {
59+
(&self.props).into()
60+
}
61+
}
62+
63+
/// The type of an search indexer producer
64+
#[cfg(feature = "producer")]
65+
pub type Producer = crate::producer::Producer<QueueType>;
66+
/// The type of an search indexer consumer
67+
#[cfg(feature = "consumer")]
68+
pub type Consumer = crate::consumer::Consumer<QueueType>;

crates/rabbitmq/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ pub mod dl_consumer;
4646
pub mod geyser;
4747
#[cfg(feature = "http-indexer")]
4848
pub mod http_indexer;
49+
#[cfg(feature = "job-runner")]
50+
pub mod job_runner;
4951
#[cfg(feature = "producer")]
5052
pub mod producer;
5153
mod queue_type;

0 commit comments

Comments
 (0)