Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
291 changes: 143 additions & 148 deletions Cargo.lock

Large diffs are not rendered by default.

24 changes: 12 additions & 12 deletions crates/plugin/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "holaplex-indexer-rabbitmq-geyser"
version = "0.7.0"
version = "0.8.0"
authors = [
"ryans <[email protected]>",
]
Expand All @@ -25,19 +25,19 @@ hashbrown = { version = "0.12.3", features = ["serde"] }
hostname = "0.3.1"
log = "0.4.17"
parking_lot = "0.12.1"
reqwest = "0.11.12"
serde = { version = "1.0.147", features = ["derive"] }
serde_json = "1.0.87"
reqwest = "0.11.13"
serde = { version = "1.0.149", features = ["derive"] }
serde_json = "1.0.89"
serde_with = "1.14.0"
tokio-executor-trait = "2.1.0"
tokio-reactor-trait = "1.1.0"

solana-geyser-plugin-interface = "~1.10.39"
solana-logger = "~1.10.39"
solana-metrics = "~1.10.39"
solana-program = "~1.10.39"
solana-transaction-status = "~1.10.39"
spl-token = "3.3.1"
solana-geyser-plugin-interface = "~1.13.5"
solana-logger = "~1.13.5"
solana-metrics = "~1.13.5"
solana-program = "~1.13.5"
solana-transaction-status = "~1.13.5"
spl-token = "3.5.0"

[dependencies.tokio]
version = "1.21.2"
Expand All @@ -52,13 +52,13 @@ features = [

[dependencies.selector]
package = "holaplex-indexer-geyser-selector"
version = "=0.1.0"
version = "=0.1.1"
path = "../selector"
default-features = false

[dependencies.indexer-rabbitmq]
package = "holaplex-indexer-rabbitmq"
version = "=0.4.0"
version = "=0.4.1"
path = "../rabbitmq"
default-features = false
features = ["producer", "geyser"]
13 changes: 10 additions & 3 deletions crates/plugin/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,20 @@ fn main() {
);
}

println!(
"cargo:rustc-env=META_BUILD_PLATFORM=ptr{},{},{}",
print!(
"cargo:rustc-env=META_BUILD_PLATFORM=ptr{},{}",
env::var("CARGO_CFG_TARGET_POINTER_WIDTH").unwrap(),
env::var("CARGO_CFG_TARGET_ENDIAN").unwrap(),
env::var("CARGO_CFG_TARGET_FEATURE").unwrap(),
);

if let Some(feat) = env::var_os("CARGO_CFG_TARGET_FEATURE") {
if !feat.is_empty() {
print!(",{}", feat.into_string().unwrap());
}
}

println!();

let toplevel = Command::new("git")
.arg("rev-parse")
.arg("--show-toplevel")
Expand Down
2 changes: 1 addition & 1 deletion crates/plugin/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ mod plugin;
pub(crate) mod selector;
pub(crate) mod sender;

pub use plugin::GeyserPluginRabbitMq;
pub(crate) use plugin::GeyserPluginRabbitMq;

#[no_mangle]
#[allow(improper_ctypes_definitions)]
Expand Down
16 changes: 12 additions & 4 deletions crates/plugin/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,25 @@ impl Counter {

#[derive(Debug)]
pub struct Metrics {
pub sends: Counter,
pub recvs: Counter,
pub acct_sends: Counter,
pub acct_recvs: Counter,
pub ins_sends: Counter,
pub txn_recvs: Counter,
pub status_sends: Counter,
pub status_recvs: Counter,
pub errs: Counter,
pub reconnects: Counter,
}

impl Metrics {
pub fn new_rc() -> Arc<Self> {
Arc::new(Self {
sends: Counter::new("geyser_sends", Level::Info),
recvs: Counter::new("geyser_recvs", Level::Info),
acct_sends: Counter::new("geyser_acct_sends", Level::Info),
acct_recvs: Counter::new("geyser_acct_recvs", Level::Info),
ins_sends: Counter::new("geyser_ins_sends", Level::Info),
txn_recvs: Counter::new("geyser_txn_recvs", Level::Info),
status_sends: Counter::new("geyser_status_sends", Level::Info),
status_recvs: Counter::new("geyser_status_recvs", Level::Info),
errs: Counter::new("geyser_errs", Level::Error),
reconnects: Counter::new("geyser_reconnects", Level::Error),
})
Expand Down
12 changes: 6 additions & 6 deletions crates/plugin/src/plugin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ impl GeyserPlugin for GeyserPluginRabbitMq {
self.with_inner(
|| GeyserPluginError::AccountsUpdateError { msg: UNINIT.into() },
|this| {
this.metrics.recvs.log(1);
this.metrics.acct_recvs.log(1);

match account {
ReplicaAccountInfoVersions::V0_0_1(acct) => {
Expand Down Expand Up @@ -250,7 +250,7 @@ impl GeyserPlugin for GeyserPluginRabbitMq {
is_startup,
}))
.await;
this.metrics.sends.log(1);
this.metrics.acct_sends.log(1);

Ok(())
});
Expand All @@ -271,7 +271,7 @@ impl GeyserPlugin for GeyserPluginRabbitMq {
self.with_inner(
|| GeyserPluginError::SlotStatusUpdateError { msg: UNINIT.into() },
|this| {
this.metrics.recvs.log(1);
this.metrics.status_recvs.log(1);

this.spawn(|this| async move {
this.producer
Expand All @@ -285,7 +285,7 @@ impl GeyserPlugin for GeyserPluginRabbitMq {
},
}))
.await;
this.metrics.sends.log(1);
this.metrics.status_sends.log(1);

Ok(())
});
Expand Down Expand Up @@ -346,7 +346,7 @@ impl GeyserPlugin for GeyserPluginRabbitMq {
return Ok(());
}

this.metrics.recvs.log(1);
this.metrics.txn_recvs.log(1);

match transaction {
ReplicaTransactionInfoVersions::V0_0_1(tx) => {
Expand Down Expand Up @@ -386,7 +386,7 @@ impl GeyserPlugin for GeyserPluginRabbitMq {
Ok(Some(m)) => {
this.spawn(|this| async move {
this.producer.send(m).await;
this.metrics.sends.log(1);
this.metrics.ins_sends.log(1);

Ok(())
});
Expand Down
8 changes: 4 additions & 4 deletions crates/rabbitmq/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "holaplex-indexer-rabbitmq"
version = "0.4.0"
version = "0.4.1"
authors = [
"ryans <[email protected]>",
]
Expand Down Expand Up @@ -29,9 +29,9 @@ lapin = "2.1.1"
log = "0.4.17"
rand = "0.8.5"
rmp-serde = "1.1.1"
serde = { version = "1.0.147", features = ["derive"] }
serde_json = { version = "1.0.87", optional = true }
serde = { version = "1.0.149", features = ["derive"] }
serde_json = { version = "1.0.89", optional = true }
# TODO: tighten this bound once more things support 1.10
solana-program = { version = ">=1.9,<1.11", optional = true }
solana-program = { version = ">=1.9,<1.14", optional = true }
strum = { version = "0.24.1", features = ["derive"] }
thiserror = "1.0.37"
76 changes: 47 additions & 29 deletions crates/rabbitmq/src/queue_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,26 @@ pub struct RetryProps {
pub max_delay: Duration,
}

impl RetryProps {
#[inline]
fn dl_exchange(&self, props: &QueueProps) -> String {
#[allow(clippy::drop_ref)]
std::mem::drop(self); // self isn't used, but is required to exist for
// this method to make sense
format!("dlx.{}", props.queue)
}

// returns (exchange, queue, triage_queue)
#[inline]
fn dl_info(&self, props: &QueueProps) -> (String, String, String) {
(
self.dl_exchange(props),
format!("dlq.{}", props.queue),
format!("triage.dlq.{}", props.queue),
)
}
}

#[derive(Debug, Clone)]
pub struct QueueProps {
pub exchange: String,
Expand Down Expand Up @@ -123,18 +143,6 @@ impl<'a> QueueInfo<'a> {

#[cfg(feature = "consumer")]
impl<'a> QueueInfo<'a> {
fn dl_exchange(self) -> String {
format!("dlx.{}", self.0.queue)
}

fn dl_queue(self) -> String {
format!("dlq.{}", self.0.queue)
}

fn dl_triage_queue(self) -> String {
format!("triage.dlq.{}", self.0.queue)
}

async fn queue_declare(self, chan: &Channel) -> Result<()> {
let mut queue_fields = FieldTable::default();

Expand All @@ -143,15 +151,17 @@ impl<'a> QueueInfo<'a> {
AMQPValue::LongLongInt(self.0.max_len_bytes),
);

queue_fields.insert(
"x-dead-letter-exchange".into(),
AMQPValue::LongString(self.dl_exchange().into()),
);
if let Some(ref retry) = self.0.retry {
queue_fields.insert(
"x-dead-letter-exchange".into(),
AMQPValue::LongString(retry.dl_exchange(self.0).into()),
);

queue_fields.insert(
"x-dead-letter-routing-key".into(),
AMQPValue::LongString(DLX_TRIAGE_KEY.into()),
);
queue_fields.insert(
"x-dead-letter-routing-key".into(),
AMQPValue::LongString(DLX_TRIAGE_KEY.into()),
);
}

chan.queue_declare(
self.0.queue.as_ref(),
Expand All @@ -167,24 +177,27 @@ impl<'a> QueueInfo<'a> {
}

/// Returns (`dl_exchange`, `dl_queue`, `dl_triage_queue`)
async fn dl_exchange_declare(self, chan: &Channel) -> Result<(String, String, String)> {
async fn dl_exchange_declare(self, chan: &Channel) -> Result<Option<(String, String, String)>> {
let mut exchg_fields = FieldTable::default();

let retry = if let Some(retry) = self.0.retry {
retry
} else {
return Ok(None);
};

exchg_fields.insert(
"x-message-ttl".into(),
AMQPValue::LongLongInt(
self.0
.retry
.as_ref()
.ok_or(Error::InvalidQueueType("Missing retry info"))?
retry
.max_delay
.as_millis()
.try_into()
.map_err(|_| Error::InvalidQueueType("Max delay overflowed i64"))?,
),
);

let exchg = self.dl_exchange();
let (exchg, queue, triage) = retry.dl_info(self.0);

chan.exchange_declare(
exchg.as_ref(),
Expand All @@ -197,15 +210,17 @@ impl<'a> QueueInfo<'a> {
)
.await?;

Ok((exchg, self.dl_queue(), self.dl_triage_queue()))
Ok(Some((exchg, queue, triage)))
}

pub(crate) async fn init_consumer(
self,
chan: &Channel,
tag: impl AsRef<str>,
) -> Result<Consumer> {
self.dl_exchange_declare(chan).await?;
if self.0.retry.is_some() {
self.dl_exchange_declare(chan).await?;
}
self.exchange_declare(chan).await?;
self.queue_declare(chan).await?;

Expand Down Expand Up @@ -235,7 +250,10 @@ impl<'a> QueueInfo<'a> {
self,
chan: &Channel,
) -> Result<(Consumer, DlConsumerInfo)> {
let (exchange, queue, triage_queue) = self.dl_exchange_declare(chan).await?;
let (exchange, queue, triage_queue) = self
.dl_exchange_declare(chan)
.await?
.ok_or(Error::InvalidQueueType("Missing retry info"))?;

{
let mut queue_fields = FieldTable::default();
Expand Down
8 changes: 4 additions & 4 deletions crates/selector/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "holaplex-indexer-geyser-selector"
version = "0.1.0"
version = "0.1.1"
authors = [
"Holaplex Engineering <[email protected]>",
]
Expand All @@ -17,16 +17,16 @@ categories = ["cryptography::cryptocurrencies", "web-programming"]
hashbrown = { version = "0.12.3", features = ["serde"] }
log = "0.4.17"
once_cell = "1.16.0"
serde = { version = "1.0.147", features = ["derive"] }
serde = { version = "1.0.149", features = ["derive"] }
thiserror = "1.0.37"

# TODO: tighten the >=1.9 bounds once more things support 1.10
solana-program = ">=1.9, <1.11"
solana-program = ">=1.9, <1.14"
spl-token = ">=3.2, <3.6"

[dependencies.indexer-rabbitmq]
package = "holaplex-indexer-rabbitmq"
version = "=0.4.0"
version = "=0.4.1"
path = "../rabbitmq"
default-features = false
features = ["producer", "geyser"]