Skip to content

Commit 6620b2f

Browse files
authored
Add support for DEXs, pools, and tokens in blockchain adapter (#2638)
1 parent 85ef8a7 commit 6620b2f

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

61 files changed

+3358
-59
lines changed

Cargo.lock

Lines changed: 689 additions & 26 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,11 @@ tokio = { version = "1.45.0", features = ["full"] }
114114
tokio-tungstenite = { version = "0.26.2", features = ["rustls-tls-native-roots"] }
115115
ustr = { version = "1.1.0", features = ["serde"] }
116116
uuid = { version = "1.16.0", features = ["v4", "serde"] }
117+
sqlx = { version = "0.8.5", features = [
118+
"postgres",
119+
"runtime-tokio",
120+
"json"
121+
]}
117122

118123
# dev-dependencies
119124
axum = "0.8.4"

crates/adapters/blockchain/Cargo.toml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ nautilus-model = { workspace = true, features = ["defi"] }
3131
nautilus-core = { workspace = true }
3232
nautilus-network = { workspace = true, features = ["python"] } # TODO: Untangle python feature
3333
nautilus-common = { workspace = true }
34+
nautilus-infrastructure = { workspace = true, features = ["postgres"] }
3435

3536
anyhow = { workspace = true }
3637
async-trait = { workspace = true }
@@ -49,6 +50,10 @@ tracing = { workspace = true }
4950
ustr = { workspace = true }
5051
hypersync-client = { version = "0.18.2", optional = true }
5152
hypersync-schema = { version = "0.3.0", optional = true }
53+
alloy = { version = "1.0.3"}
54+
hex = { version = "0.4.3" }
55+
sqlx = {workspace = true}
56+
bytes = { workspace = true }
5257

5358
[dev-dependencies]
5459
rstest = { workspace = true }
@@ -62,3 +67,8 @@ required-features = ["hypersync"]
6267
name = "live_blocks_hypersync"
6368
path = "bin/watch_hypersync_live_blocks.rs"
6469
required-features = ["hypersync"]
70+
71+
[[bin]]
72+
name = "sync_tokens_pools"
73+
path = "bin/sync_tokens_pools.rs"
74+
required-features = ["hypersync"]

crates/adapters/blockchain/README.md

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ You can configure the required environment variables in two ways:
1111
```
1212
CHAIN=Ethereum
1313
RPC_WSS_URL=wss://mainnet.infura.io/ws/v3/YOUR_INFURA_API_KEY
14+
RPC_HTTP_URL=https://mainnet.infura.io/v3/YOUR_INFURA_API_KEY
1415
```
1516

1617
2. **Providing variables directly in the command line:**
@@ -24,11 +25,11 @@ You can configure the required environment variables in two ways:
2425
The scripts will connect to the specified blockchain and log information about each new block received for both the RPC version and only Hypersync.
2526

2627
```
27-
cargo run --bin live_blocks_rpc
28+
cargo run --bin live_blocks_rpc --features hypersync
2829
```
2930

3031
```
31-
cargo run --bin live_blocks_hypersync
32+
cargo run --bin live_blocks_hypersync --features hypersync
3233
```
3334

3435
For RPC example, the output should be:
@@ -42,3 +43,10 @@ Running `target/debug/live_blocks_rpc`
4243
^C2025-04-25T14:55:38.314022000Z [INFO] TRADER-001.live_blocks: Shutdown signal received, shutting down...
4344
4445
```
46+
47+
### Sync dex, tokens and pool for Uniswap V3 on Ethereum
48+
This script demonstrates how to use the blockchain data client to discover and cache Uniswap V3 pools and their associated tokens. It queries the Ethereum blockchain for pool creation events emitted by the Uniswap V3 factory contract, retrieves token metadata (name, symbol, decimals) for each token in the pools via smart contract calls, and stores everything in a local Postgres database.
49+
50+
```
51+
cargo run --bin sync_tokens_pools --features hypersync
52+
```
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
// -------------------------------------------------------------------------------------------------
2+
// Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3+
// https://nautechsystems.io
4+
//
5+
// Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6+
// You may not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
// -------------------------------------------------------------------------------------------------
15+
16+
use std::sync::Arc;
17+
18+
use nautilus_blockchain::{config::BlockchainAdapterConfig, data::BlockchainDataClient, exchanges};
19+
use nautilus_common::logging::{
20+
logger::{Logger, LoggerConfig},
21+
writer::FileWriterConfig,
22+
};
23+
use nautilus_core::UUID4;
24+
use nautilus_model::{
25+
defi::chain::{Blockchain, Chain, chains},
26+
identifiers::TraderId,
27+
};
28+
use tokio::sync::Notify;
29+
30+
#[tokio::main]
31+
async fn main() -> Result<(), Box<dyn std::error::Error>> {
32+
dotenvy::dotenv().ok();
33+
// Setup logger
34+
let _logger_guard = Logger::init_with_config(
35+
TraderId::default(),
36+
UUID4::new(),
37+
LoggerConfig::default(),
38+
FileWriterConfig::new(None, None, None, None),
39+
)?;
40+
41+
// Setup graceful shutdown with signal handling in different task
42+
let notify = Arc::new(Notify::new());
43+
let notifier = notify.clone();
44+
tokio::spawn(async move {
45+
let mut sigterm = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
46+
.expect("Failed to create SIGTERM listener");
47+
let mut sigint = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt())
48+
.expect("Failed to create SIGINT listener");
49+
tokio::select! {
50+
_ = sigterm.recv() => {}
51+
_ = sigint.recv() => {}
52+
}
53+
log::info!("Shutdown signal received, shutting down...");
54+
notifier.notify_one();
55+
});
56+
57+
// Initialize the blockchain data client, connect and subscribe to live blocks with RPC
58+
let chain: Chain = match std::env::var("CHAIN") {
59+
Ok(chain_str) => {
60+
if let Ok(blockchain) = chain_str.parse::<Blockchain>() {
61+
match blockchain {
62+
Blockchain::Ethereum => chains::ETHEREUM.clone(),
63+
Blockchain::Base => chains::BASE.clone(),
64+
Blockchain::Arbitrum => chains::ARBITRUM.clone(),
65+
Blockchain::Polygon => chains::POLYGON.clone(),
66+
_ => panic!("Invalid chain {chain_str}"),
67+
}
68+
} else {
69+
panic!("Invalid chain {chain_str}");
70+
}
71+
}
72+
Err(_) => chains::ETHEREUM.clone(), // default
73+
};
74+
let chain = Arc::new(chain);
75+
let http_rpc_url = std::env::var("RPC_HTTP_URL").expect("RPC_HTTP_URL must be set");
76+
let blockchain_adapter_config = BlockchainAdapterConfig::new(http_rpc_url, Some(3), None, true);
77+
let mut data_client = BlockchainDataClient::new(chain, blockchain_adapter_config);
78+
data_client.initialize_cache_database(None).await;
79+
80+
let univ3 = exchanges::ethereum::UNISWAP_V3.clone();
81+
let dex_id = univ3.id();
82+
data_client.connect().await?;
83+
data_client.register_exchange(univ3.clone()).await?;
84+
// Lets use block https://etherscan.io/block/22327045 from (Apr-22-2025 08:49:47 PM +UTC)
85+
let from_block = Some(22327045);
86+
87+
// Main loop to keep the app running
88+
loop {
89+
tokio::select! {
90+
() = notify.notified() => break,
91+
result = data_client.sync_exchange_pools(dex_id.as_str(), from_block) => {
92+
match result {
93+
Ok(_) => {
94+
// Exit after the tokens and pool are synced successfully
95+
log::info!("Successfully synced tokens and pools");
96+
break;
97+
},
98+
Err(e) => {
99+
// Handle error case
100+
log::error!("Error syncing tokens and pools: {}", e);
101+
break;
102+
}
103+
}
104+
}
105+
}
106+
}
107+
data_client.disconnect()?;
108+
Ok(())
109+
}

crates/adapters/blockchain/bin/watch_hypersync_live_blocks.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
7171
}
7272
Err(_) => chains::ETHEREUM.clone(), // default
7373
};
74-
let blockchain_adapter_config = BlockchainAdapterConfig::new(None, true);
74+
let chain = Arc::new(chain);
75+
let http_rpc_url = std::env::var("RPC_HTTP_URL").expect("RPC_HTTP_URL must be set");
76+
let blockchain_adapter_config = BlockchainAdapterConfig::new(http_rpc_url, None, None, true);
7577
let mut data_client = BlockchainDataClient::new(chain.clone(), blockchain_adapter_config);
7678
data_client.connect().await?;
7779
data_client.subscribe_blocks().await;

crates/adapters/blockchain/bin/watch_rpc_live_blocks.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
7171
}
7272
Err(_) => chains::ETHEREUM.clone(), // default
7373
};
74+
let chain = Arc::new(chain);
7475
let wss_rpc_url = std::env::var("RPC_WSS_URL").expect("RPC_WSS_URL must be set");
75-
let blockchain_adapter_config = BlockchainAdapterConfig::new(Some(wss_rpc_url), false);
76+
let http_rpc_url = std::env::var("RPC_HTTP_URL").expect("RPC_HTTP_URL must be set");
77+
let blockchain_adapter_config =
78+
BlockchainAdapterConfig::new(http_rpc_url, None, Some(wss_rpc_url), false);
7679
let mut data_client = BlockchainDataClient::new(chain.clone(), blockchain_adapter_config);
7780
data_client.connect().await?;
7881
data_client.subscribe_blocks().await;
Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
// -------------------------------------------------------------------------------------------------
2+
// Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3+
// https://nautechsystems.io
4+
//
5+
// Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6+
// You may not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
// -------------------------------------------------------------------------------------------------
15+
16+
use nautilus_model::defi::{
17+
amm::Pool,
18+
chain::{Chain, SharedChain},
19+
dex::Dex,
20+
token::Token,
21+
};
22+
use sqlx::{PgPool, postgres::PgConnectOptions};
23+
24+
use crate::cache::rows::TokenRow;
25+
26+
/// Database interface for persisting and retrieving blockchain entities and domain objects.
27+
pub struct BlockchainCacheDatabase {
28+
/// PostgreSQL connection pool used for database operations
29+
pool: PgPool,
30+
}
31+
32+
impl BlockchainCacheDatabase {
33+
/// Initializes a new database instance by establishing a connection to PostgreSQL.
34+
pub async fn init(pg_options: PgConnectOptions) -> Self {
35+
let pool = PgPool::connect_with(pg_options)
36+
.await
37+
.expect("Error connecting to Postgres");
38+
Self { pool }
39+
}
40+
41+
/// Seeds the database with a blockchain chain record.
42+
pub async fn seed_chain(&self, chain: &Chain) -> anyhow::Result<()> {
43+
sqlx::query(
44+
r#"
45+
INSERT INTO chain (
46+
chain_id, name
47+
) VALUES ($1,$2)
48+
ON CONFLICT (chain_id)
49+
DO NOTHING
50+
"#,
51+
)
52+
.bind(chain.chain_id as i32)
53+
.bind(chain.name.to_string())
54+
.execute(&self.pool)
55+
.await
56+
.map(|_| ())
57+
.map_err(|e| anyhow::anyhow!("Failed to seed chain table: {e}"))
58+
}
59+
60+
/// Adds or updates a DEX (Decentralized Exchange) record in the database.
61+
pub async fn add_dex(&self, dex: &Dex) -> anyhow::Result<()> {
62+
sqlx::query(
63+
r#"
64+
INSERT INTO dex (
65+
chain_id, name, factory_address
66+
) VALUES ($1, $2, $3)
67+
ON CONFLICT (chain_id, name)
68+
DO UPDATE
69+
SET
70+
factory_address = $3
71+
"#,
72+
)
73+
.bind(dex.chain.chain_id as i32)
74+
.bind(dex.name.as_ref())
75+
.bind(dex.factory.as_ref())
76+
.execute(&self.pool)
77+
.await
78+
.map(|_| ())
79+
.map_err(|e| anyhow::anyhow!("Failed to insert into dex table: {e}"))
80+
}
81+
82+
/// Adds or updates a liquidity pool/pair record in the database.
83+
pub async fn add_pool(&self, pool: &Pool) -> anyhow::Result<()> {
84+
sqlx::query(
85+
r#"
86+
INSERT INTO pool (
87+
chain_id, address, dex_name, creation_block,
88+
token0_chain, token0_address,
89+
token1_chain, token1_address,
90+
fee, tick_spacing
91+
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
92+
ON CONFLICT (chain_id, address)
93+
DO UPDATE
94+
SET
95+
dex_name = $3,
96+
creation_block = $4,
97+
token0_chain = $5,
98+
token0_address = $6,
99+
token1_chain = $7,
100+
token1_address = $8,
101+
fee = $9,
102+
tick_spacing = $10
103+
"#,
104+
)
105+
.bind(pool.chain.chain_id as i32)
106+
.bind(pool.address.as_str())
107+
.bind(pool.dex.name.as_ref())
108+
.bind(pool.creation_block as i64)
109+
.bind(pool.token0.chain.chain_id as i32)
110+
.bind(pool.token0.address.as_str())
111+
.bind(pool.token1.chain.chain_id as i32)
112+
.bind(pool.token1.address.as_str())
113+
.bind(pool.fee as i32)
114+
.bind(pool.tick_spacing as i32)
115+
.execute(&self.pool)
116+
.await
117+
.map(|_| ())
118+
.map_err(|e| anyhow::anyhow!("Failed to insert into pool table: {e}"))
119+
}
120+
121+
/// Adds or updates a token record in the database.
122+
pub async fn add_token(&self, token: &Token) -> anyhow::Result<()> {
123+
sqlx::query(
124+
r#"
125+
INSERT INTO token (
126+
chain_id, address, name, symbol, decimals
127+
) VALUES ($1, $2, $3, $4, $5)
128+
ON CONFLICT (chain_id, address)
129+
DO UPDATE
130+
SET
131+
name = $3,
132+
symbol = $4,
133+
decimals = $5
134+
"#,
135+
)
136+
.bind(token.chain.chain_id as i32)
137+
.bind(token.address.as_str())
138+
.bind(token.name.as_str())
139+
.bind(token.symbol.as_str())
140+
.bind(token.decimals as i32)
141+
.execute(&self.pool)
142+
.await
143+
.map(|_| ())
144+
.map_err(|e| anyhow::anyhow!("Failed to insert into token table: {e}"))
145+
}
146+
147+
/// Retrieves all token records for the given chain and converts them into `Token` domain objects.
148+
pub async fn load_tokens(&self, chain: SharedChain) -> anyhow::Result<Vec<Token>> {
149+
sqlx::query_as::<_, TokenRow>("SELECT * FROM token WHERE chain_id = $1")
150+
.bind(chain.chain_id as i32)
151+
.fetch_all(&self.pool)
152+
.await
153+
.map(|rows| {
154+
rows.into_iter()
155+
.map(|token_row| {
156+
Token::new(
157+
chain.clone(),
158+
token_row.address,
159+
token_row.name,
160+
token_row.symbol,
161+
token_row.decimals as u8,
162+
)
163+
})
164+
.collect::<Vec<_>>()
165+
})
166+
.map_err(|e| anyhow::anyhow!("Failed to load tokens: {e}"))
167+
}
168+
}

0 commit comments

Comments
 (0)