Skip to content

Commit 5b66df7

Browse files
authored
Merge pull request #22 from holaplex/ryans/increased-limits
Job Runner + Increased Limits
2 parents 5b854a4 + af2848e commit 5b66df7

File tree

8 files changed

+87
-10
lines changed

8 files changed

+87
-10
lines changed

Cargo.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/plugin/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "holaplex-indexer-rabbitmq-geyser"
3-
version = "0.5.0"
3+
version = "0.6.0"
44
authors = [
55
"ryans <[email protected]>",
66
]
@@ -51,7 +51,7 @@ features = [
5151

5252
[dependencies.indexer-rabbitmq]
5353
package = "holaplex-indexer-rabbitmq"
54-
version = "=0.2.0"
54+
version = "=0.3.0"
5555
path = "../rabbitmq"
5656
default-features = false
5757
features = ["producer", "geyser"]

crates/rabbitmq/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "holaplex-indexer-rabbitmq"
3-
version = "0.2.0"
3+
version = "0.3.0"
44
authors = [
55
"ryans <[email protected]>",
66
]
@@ -18,6 +18,7 @@ consumer = ["suffix"]
1818
default = ["consumer"]
1919
geyser = ["solana-program", "suffix"]
2020
http-indexer = ["solana-program", "suffix"]
21+
job-runner = []
2122
producer = ["suffix"]
2223
search-indexer = ["serde_json", "solana-program", "suffix"]
2324
suffix = ["clap"]

crates/rabbitmq/src/geyser.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,9 +125,9 @@ impl QueueType {
125125
binding: Binding::Fanout,
126126
prefetch: 4096,
127127
max_len_bytes: if suffix.is_debug() || matches!(startup_type, StartupType::Normal) {
128-
100 * 1024 * 1024 // 100 MiB
128+
512 * 1024 * 1024 // 512 MiB
129129
} else {
130-
8 * 1024 * 1024 * 1024 // 8 GiB
130+
12 * 1024 * 1024 * 1024 // 12 GiB
131131
},
132132
auto_delete: suffix.is_debug(),
133133
retry: Some(RetryProps {

crates/rabbitmq/src/http_indexer.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@ pub trait Entity: std::fmt::Debug + Serialize + for<'a> Deserialize<'a> {
4747

4848
/// A name to use when declaring queues and exchanges
4949
const ID: Self::Id;
50+
51+
/// The maximum queue length to use for this entity
52+
const MAX_LEN_BYTES: i64;
5053
}
5154

5255
/// Fetch the off-chain JSON for a metadata account
@@ -67,6 +70,8 @@ impl Entity for MetadataJson {
6770
type Id = EntityId;
6871

6972
const ID: EntityId = EntityId::MetadataJson;
73+
74+
const MAX_LEN_BYTES: i64 = 512 * 1024 * 1024; // 512 MiB
7075
}
7176

7277
/// Fetch the off-chain JSON config for a storefront
@@ -82,6 +87,8 @@ impl Entity for StoreConfig {
8287
type Id = EntityId;
8388

8489
const ID: EntityId = EntityId::StoreConfig;
90+
91+
const MAX_LEN_BYTES: i64 = 100 * 1024 * 1024; // 100 MiB
8592
}
8693

8794
impl<E: Entity> QueueType<E> {
@@ -100,7 +107,7 @@ impl<E: Entity> QueueType<E> {
100107
queue,
101108
binding: Binding::Fanout,
102109
prefetch: 1024,
103-
max_len_bytes: 100 * 1024 * 1024, // 100 MiB
110+
max_len_bytes: E::MAX_LEN_BYTES,
104111
auto_delete: suffix.is_debug(),
105112
retry: Some(RetryProps {
106113
max_tries: 8,

crates/rabbitmq/src/job_runner.rs

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
//! Queue configuration for dispatching background jobs.
2+
3+
use std::time::Duration;
4+
5+
use serde::{Deserialize, Serialize};
6+
7+
use crate::{
8+
queue_type::{Binding, QueueProps, RetryProps},
9+
suffix::Suffix,
10+
Result,
11+
};
12+
13+
/// Message data for a job dispatch request
14+
#[derive(Debug, Clone, Serialize, Deserialize)]
15+
pub enum Message {
16+
/// Refresh a table of cached data
17+
RefreshTable(String),
18+
}
19+
20+
/// AMQP configuration for job runners
21+
#[derive(Debug, Clone)]
22+
pub struct QueueType {
23+
props: QueueProps,
24+
}
25+
26+
impl QueueType {
27+
/// Construct a new queue configuration given the expected sender and queue
28+
/// suffix configuration
29+
///
30+
/// # Errors
31+
/// This function fails if the given queue suffix is invalid.
32+
pub fn new(sender: &str, suffix: &Suffix) -> Result<Self> {
33+
let exchange = format!("{}.jobs", sender);
34+
let queue = suffix.format(format!("{}.runner", exchange))?;
35+
36+
Ok(Self {
37+
props: QueueProps {
38+
exchange,
39+
queue,
40+
binding: Binding::Fanout,
41+
prefetch: 1,
42+
max_len_bytes: 100 * 1024 * 1024, // 100 MiB
43+
auto_delete: suffix.is_debug(),
44+
retry: Some(RetryProps {
45+
max_tries: 5,
46+
delay_hint: Duration::from_secs(5),
47+
max_delay: Duration::from_secs(10 * 60),
48+
}),
49+
},
50+
})
51+
}
52+
}
53+
54+
impl crate::QueueType for QueueType {
55+
type Message = Message;
56+
57+
#[inline]
58+
fn info(&self) -> crate::queue_type::QueueInfo {
59+
(&self.props).into()
60+
}
61+
}
62+
63+
/// The type of an search indexer producer
64+
#[cfg(feature = "producer")]
65+
pub type Producer = crate::producer::Producer<QueueType>;
66+
/// The type of an search indexer consumer
67+
#[cfg(feature = "consumer")]
68+
pub type Consumer = crate::consumer::Consumer<QueueType>;

crates/rabbitmq/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ pub mod dl_consumer;
4646
pub mod geyser;
4747
#[cfg(feature = "http-indexer")]
4848
pub mod http_indexer;
49+
#[cfg(feature = "job-runner")]
50+
pub mod job_runner;
4951
#[cfg(feature = "producer")]
5052
pub mod producer;
5153
mod queue_type;

crates/rabbitmq/src/queue_type.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -279,8 +279,7 @@ impl<'a> QueueInfo<'a> {
279279
let mut queue_fields = FieldTable::default();
280280
queue_fields.insert(
281281
"x-max-length-bytes".into(),
282-
// Top out length at 100 MiB
283-
AMQPValue::LongLongInt(self.0.max_len_bytes.min(100 * 1024 * 1024)),
282+
AMQPValue::LongLongInt(self.0.max_len_bytes),
284283
);
285284

286285
// TODO: add a true DL queue

0 commit comments

Comments
 (0)