From a9ebb43ac68899077ad74b70c61031926869a5c8 Mon Sep 17 00:00:00 2001 From: incrypto32 Date: Fri, 25 Apr 2025 13:49:26 +0400 Subject: [PATCH 01/11] node: Refactor main execution flow and introduce launcher module --- node/src/launcher.rs | 592 +++++++++++++++++++++++++++++++++++++++++++ node/src/lib.rs | 4 +- node/src/main.rs | 587 +----------------------------------------- 3 files changed, 598 insertions(+), 585 deletions(-) create mode 100644 node/src/launcher.rs diff --git a/node/src/launcher.rs b/node/src/launcher.rs new file mode 100644 index 00000000000..5357926a8f2 --- /dev/null +++ b/node/src/launcher.rs @@ -0,0 +1,592 @@ +use anyhow::Result; + +use git_testament::{git_testament, render_testament}; +use graph::futures01::Future as _; +use graph::futures03::compat::Future01CompatExt; +use graph::futures03::future::TryFutureExt; + +use crate::config::Config; +use crate::network_setup::Networks; +use crate::opt::Opt; +use crate::store_builder::StoreBuilder; +use graph::blockchain::{Blockchain, BlockchainKind}; +use graph::components::link_resolver::{ArweaveClient, FileSizeLimit}; +use graph::components::subgraph::Settings; +use graph::data::graphql::load_manager::LoadManager; +use graph::endpoint::EndpointMetrics; +use graph::env::EnvVars; +use graph::log::logger; +use graph::prelude::*; +use graph::prometheus::Registry; +use graph::url::Url; +use graph_core::polling_monitor::{arweave_service, ipfs_service}; +use graph_core::{ + SubgraphAssignmentProvider as IpfsSubgraphAssignmentProvider, SubgraphInstanceManager, + SubgraphRegistrar as IpfsSubgraphRegistrar, +}; +use graph_graphql::prelude::GraphQlRunner; +use graph_server_http::GraphQLServer as GraphQLQueryServer; +use graph_server_index_node::IndexNodeServer; +use graph_server_json_rpc::JsonRpcServer; +use graph_server_metrics::PrometheusMetricsServer; +use graph_store_postgres::{ + register_jobs as register_store_jobs, ConnectionPool, NotificationSender, Store, +}; +use graphman_server::GraphmanServer; +use graphman_server::GraphmanServerConfig; +use std::io::{BufRead, BufReader}; +use std::path::Path; +use std::time::Duration; +use tokio::sync::mpsc; + +git_testament!(TESTAMENT); + +fn read_expensive_queries( + logger: &Logger, + expensive_queries_filename: String, +) -> Result>, std::io::Error> { + // A file with a list of expensive queries, one query per line + // Attempts to run these queries will return a + // QueryExecutionError::TooExpensive to clients + let path = Path::new(&expensive_queries_filename); + let mut queries = Vec::new(); + if path.exists() { + info!( + logger, + "Reading expensive queries file: {}", expensive_queries_filename + ); + let file = std::fs::File::open(path)?; + let reader = BufReader::new(file); + for line in reader.lines() { + let line = line?; + let query = q::parse_query(&line) + .map_err(|e| { + let msg = format!( + "invalid GraphQL query in {}: {}\n{}", + expensive_queries_filename, e, line + ); + std::io::Error::new(std::io::ErrorKind::InvalidData, msg) + })? + .into_static(); + queries.push(Arc::new(query)); + } + } else { + warn!( + logger, + "Expensive queries file not set to a valid file: {}", expensive_queries_filename + ); + } + Ok(queries) +} + +pub async fn run(opt: Opt, env_vars: Arc) { + env_logger::init(); + // Set up logger + let logger = logger(opt.debug); + + // Log version information + info!( + logger, + "Graph Node version: {}", + render_testament!(TESTAMENT) + ); + + if !graph_server_index_node::PoiProtection::from_env(&ENV_VARS).is_active() { + warn!( + logger, + "GRAPH_POI_ACCESS_TOKEN not set; might leak POIs to the public via GraphQL" + ); + } + + let config = match Config::load(&logger, &opt.clone().into()) { + Err(e) => { + eprintln!("configuration error: {}", e); + std::process::exit(1); + } + Ok(config) => config, + }; + + let subgraph_settings = match env_vars.subgraph_settings { + Some(ref path) => { + info!(logger, "Reading subgraph configuration file `{}`", path); + match Settings::from_file(path) { + Ok(rules) => rules, + Err(e) => { + eprintln!("configuration error in subgraph settings {}: {}", path, e); + std::process::exit(1); + } + } + } + None => Settings::default(), + }; + + if opt.check_config { + match config.to_json() { + Ok(txt) => println!("{}", txt), + Err(e) => eprintln!("error serializing config: {}", e), + } + eprintln!("Successfully validated configuration"); + std::process::exit(0); + } + + let node_id = NodeId::new(opt.node_id.clone()) + .expect("Node ID must be between 1 and 63 characters in length"); + + // Obtain subgraph related command-line arguments + let subgraph = opt.subgraph.clone(); + + // Obtain ports to use for the GraphQL server(s) + let http_port = opt.http_port; + + // Obtain JSON-RPC server port + let json_rpc_port = opt.admin_port; + + // Obtain index node server port + let index_node_port = opt.index_node_port; + + // Obtain metrics server port + let metrics_port = opt.metrics_port; + + // Obtain the fork base URL + let fork_base = match &opt.fork_base { + Some(url) => { + // Make sure the endpoint ends with a terminating slash. + let url = if !url.ends_with('/') { + let mut url = url.clone(); + url.push('/'); + Url::parse(&url) + } else { + Url::parse(url) + }; + + Some(url.expect("Failed to parse the fork base URL")) + } + None => { + warn!( + logger, + "No fork base URL specified, subgraph forking is disabled" + ); + None + } + }; + + info!(logger, "Starting up"); + + // Optionally, identify the Elasticsearch logging configuration + let elastic_config = opt + .elasticsearch_url + .clone() + .map(|endpoint| ElasticLoggingConfig { + endpoint, + username: opt.elasticsearch_user.clone(), + password: opt.elasticsearch_password.clone(), + client: reqwest::Client::new(), + }); + + // Set up Prometheus registry + let prometheus_registry = Arc::new(Registry::new()); + let metrics_registry = Arc::new(MetricsRegistry::new( + logger.clone(), + prometheus_registry.clone(), + )); + + // Create a component and subgraph logger factory + let logger_factory = + LoggerFactory::new(logger.clone(), elastic_config, metrics_registry.clone()); + + let ipfs_client = graph::ipfs::new_ipfs_client(&opt.ipfs, &logger) + .await + .unwrap_or_else(|err| panic!("Failed to create IPFS client: {err:#}")); + + let ipfs_service = ipfs_service( + ipfs_client.cheap_clone(), + ENV_VARS.mappings.max_ipfs_file_bytes, + ENV_VARS.mappings.ipfs_timeout, + ENV_VARS.mappings.ipfs_request_limit, + ); + + let arweave_resolver = Arc::new(ArweaveClient::new( + logger.cheap_clone(), + opt.arweave + .parse() + .expect("unable to parse arweave gateway address"), + )); + + let arweave_service = arweave_service( + arweave_resolver.cheap_clone(), + env_vars.mappings.ipfs_request_limit, + match env_vars.mappings.max_ipfs_file_bytes { + 0 => FileSizeLimit::Unlimited, + n => FileSizeLimit::MaxBytes(n as u64), + }, + ); + + // Convert the clients into a link resolver. Since we want to get past + // possible temporary DNS failures, make the resolver retry + let link_resolver = Arc::new(IpfsResolver::new(ipfs_client, env_vars.cheap_clone())); + let metrics_server = PrometheusMetricsServer::new(&logger_factory, prometheus_registry.clone()); + + let endpoint_metrics = Arc::new(EndpointMetrics::new( + logger.clone(), + &config.chains.providers(), + metrics_registry.cheap_clone(), + )); + + let graphql_metrics_registry = metrics_registry.clone(); + + let contention_logger = logger.clone(); + + // TODO: make option loadable from configuration TOML and environment: + let expensive_queries = + read_expensive_queries(&logger, opt.expensive_queries_filename).unwrap(); + + let store_builder = StoreBuilder::new( + &logger, + &node_id, + &config, + fork_base, + metrics_registry.cheap_clone(), + ) + .await; + + let primary_pool = store_builder.primary_pool(); + let subscription_manager = store_builder.subscription_manager(); + let chain_head_update_listener = store_builder.chain_head_update_listener(); + let network_store = store_builder.network_store(config.chain_ids()); + + let graphman_server_config = make_graphman_server_config( + primary_pool.clone(), + network_store.cheap_clone(), + metrics_registry.cheap_clone(), + &env_vars, + &logger, + &logger_factory, + ); + + start_graphman_server(opt.graphman_port, graphman_server_config).await; + + let launch_services = |logger: Logger, env_vars: Arc| async move { + use graph::components::network_provider; + + let block_store = network_store.block_store(); + + let mut provider_checks: Vec> = Vec::new(); + + if env_vars.genesis_validation_enabled { + provider_checks.push(Arc::new(network_provider::GenesisHashCheck::new( + block_store.clone(), + ))); + } + + provider_checks.push(Arc::new(network_provider::ExtendedBlocksCheck::new( + env_vars + .firehose_disable_extended_blocks_for_chains + .iter() + .map(|x| x.as_str().into()), + ))); + + let network_adapters = Networks::from_config( + logger.cheap_clone(), + &config, + metrics_registry.cheap_clone(), + endpoint_metrics, + &provider_checks, + ) + .await + .expect("unable to parse network configuration"); + + let blockchain_map = network_adapters + .blockchain_map( + &env_vars, + &node_id, + &logger, + block_store, + &logger_factory, + metrics_registry.cheap_clone(), + chain_head_update_listener, + ) + .await; + + // see comment on cleanup_ethereum_shallow_blocks + if !opt.disable_block_ingestor { + match blockchain_map + .get_all_by_kind::(BlockchainKind::Ethereum) + .ok() + .map(|chains| { + chains + .iter() + .flat_map(|c| { + if !c.chain_client().is_firehose() { + Some(c.name.to_string()) + } else { + None + } + }) + .collect() + }) { + Some(eth_network_names) => { + network_store + .block_store() + .cleanup_ethereum_shallow_blocks(eth_network_names) + .unwrap(); + } + // This code path only happens if the downcast on the blockchain map fails, that + // probably means we have a problem with the chain loading logic so it's probably + // safest to just refuse to start. + None => unreachable!( + "If you are seeing this message just use a different version of graph-node" + ), + } + } + + let blockchain_map = Arc::new(blockchain_map); + + let shards: Vec<_> = config.stores.keys().cloned().collect(); + let load_manager = Arc::new(LoadManager::new( + &logger, + shards, + expensive_queries, + metrics_registry.clone(), + )); + let graphql_runner = Arc::new(GraphQlRunner::new( + &logger, + network_store.clone(), + load_manager, + graphql_metrics_registry, + )); + let graphql_server = GraphQLQueryServer::new(&logger_factory, graphql_runner.clone()); + + let index_node_server = IndexNodeServer::new( + &logger_factory, + blockchain_map.clone(), + network_store.clone(), + link_resolver.clone(), + ); + + if !opt.disable_block_ingestor { + let logger = logger.clone(); + let ingestors = Networks::block_ingestors(&logger, &blockchain_map) + .await + .expect("unable to start block ingestors"); + + ingestors.into_iter().for_each(|ingestor| { + let logger = logger.clone(); + info!(logger,"Starting block ingestor for network";"network_name" => &ingestor.network_name().as_str(), "kind" => ingestor.kind().to_string()); + + graph::spawn(ingestor.run()); + }); + + // Start a task runner + let mut job_runner = graph::util::jobs::Runner::new(&logger); + register_store_jobs( + &mut job_runner, + network_store.clone(), + primary_pool, + metrics_registry.clone(), + ); + graph::spawn_blocking(job_runner.start()); + } + let static_filters = ENV_VARS.experimental_static_filters; + + let sg_count = Arc::new(SubgraphCountMetric::new(metrics_registry.cheap_clone())); + + let subgraph_instance_manager = SubgraphInstanceManager::new( + &logger_factory, + env_vars.cheap_clone(), + network_store.subgraph_store(), + blockchain_map.cheap_clone(), + sg_count.cheap_clone(), + metrics_registry.clone(), + link_resolver.clone(), + ipfs_service, + arweave_service, + static_filters, + ); + + // Create IPFS-based subgraph provider + let subgraph_provider = IpfsSubgraphAssignmentProvider::new( + &logger_factory, + link_resolver.clone(), + subgraph_instance_manager, + sg_count, + ); + + // Check version switching mode environment variable + let version_switching_mode = ENV_VARS.subgraph_version_switching_mode; + + // Create named subgraph provider for resolving subgraph name->ID mappings + let subgraph_registrar = Arc::new(IpfsSubgraphRegistrar::new( + &logger_factory, + link_resolver, + Arc::new(subgraph_provider), + network_store.subgraph_store(), + subscription_manager, + blockchain_map, + node_id.clone(), + version_switching_mode, + Arc::new(subgraph_settings), + )); + graph::spawn( + subgraph_registrar + .start() + .map_err(|e| panic!("failed to initialize subgraph provider {}", e)) + .compat(), + ); + + // Start admin JSON-RPC server. + let json_rpc_server = JsonRpcServer::serve( + json_rpc_port, + http_port, + subgraph_registrar.clone(), + node_id.clone(), + logger.clone(), + ) + .await + .expect("failed to start JSON-RPC admin server"); + + // Let the server run forever. + std::mem::forget(json_rpc_server); + + // Add the CLI subgraph with a REST request to the admin server. + if let Some(subgraph) = subgraph { + let (name, hash) = if subgraph.contains(':') { + let mut split = subgraph.split(':'); + (split.next().unwrap(), split.next().unwrap().to_owned()) + } else { + ("cli", subgraph) + }; + + let name = SubgraphName::new(name) + .expect("Subgraph name must contain only a-z, A-Z, 0-9, '-' and '_'"); + let subgraph_id = + DeploymentHash::new(hash).expect("Subgraph hash must be a valid IPFS hash"); + let debug_fork = opt + .debug_fork + .map(DeploymentHash::new) + .map(|h| h.expect("Debug fork hash must be a valid IPFS hash")); + let start_block = opt + .start_block + .map(|block| { + let mut split = block.split(':'); + ( + // BlockHash + split.next().unwrap().to_owned(), + // BlockNumber + split.next().unwrap().parse::().unwrap(), + ) + }) + .map(|(hash, number)| BlockPtr::try_from((hash.as_str(), number))) + .map(Result::unwrap); + + graph::spawn( + async move { + subgraph_registrar.create_subgraph(name.clone()).await?; + subgraph_registrar + .create_subgraph_version( + name, + subgraph_id, + node_id, + debug_fork, + start_block, + None, + None, + ) + .await + } + .map_err(|e| panic!("Failed to deploy subgraph from `--subgraph` flag: {}", e)), + ); + } + + // Serve GraphQL queries over HTTP + graph::spawn(async move { graphql_server.start(http_port).await }); + + // Run the index node server + graph::spawn(async move { index_node_server.start(index_node_port).await }); + + graph::spawn(async move { + metrics_server + .start(metrics_port) + .await + .expect("Failed to start metrics server") + }); + }; + + graph::spawn(launch_services(logger.clone(), env_vars.cheap_clone())); + + // Periodically check for contention in the tokio threadpool. First spawn a + // task that simply responds to "ping" requests. Then spawn a separate + // thread to periodically ping it and check responsiveness. + let (ping_send, mut ping_receive) = mpsc::channel::>(1); + graph::spawn(async move { + while let Some(pong_send) = ping_receive.recv().await { + let _ = pong_send.clone().send(()); + } + panic!("ping sender dropped"); + }); + std::thread::spawn(move || loop { + std::thread::sleep(Duration::from_secs(1)); + let (pong_send, pong_receive) = std::sync::mpsc::sync_channel(1); + if graph::futures03::executor::block_on(ping_send.clone().send(pong_send)).is_err() { + debug!(contention_logger, "Shutting down contention checker thread"); + break; + } + let mut timeout = Duration::from_millis(10); + while pong_receive.recv_timeout(timeout) == Err(std::sync::mpsc::RecvTimeoutError::Timeout) + { + debug!(contention_logger, "Possible contention in tokio threadpool"; + "timeout_ms" => timeout.as_millis(), + "code" => LogCode::TokioContention); + if timeout < ENV_VARS.kill_if_unresponsive_timeout { + timeout *= 10; + } else if ENV_VARS.kill_if_unresponsive { + // The node is unresponsive, kill it in hopes it will be restarted. + crit!(contention_logger, "Node is unresponsive, killing process"); + std::process::abort() + } + } + }); + + graph::futures03::future::pending::<()>().await; +} + +async fn start_graphman_server(port: u16, config: Option>) { + let Some(config) = config else { + return; + }; + + let server = GraphmanServer::new(config) + .unwrap_or_else(|err| panic!("Invalid graphman server configuration: {err:#}")); + + server + .start(port) + .await + .unwrap_or_else(|err| panic!("Failed to start graphman server: {err:#}")); +} + +fn make_graphman_server_config<'a>( + pool: ConnectionPool, + store: Arc, + metrics_registry: Arc, + env_vars: &EnvVars, + logger: &Logger, + logger_factory: &'a LoggerFactory, +) -> Option> { + let Some(auth_token) = &env_vars.graphman_server_auth_token else { + warn!( + logger, + "Missing graphman server auth token; graphman server will not start", + ); + + return None; + }; + + let notification_sender = Arc::new(NotificationSender::new(metrics_registry.clone())); + + Some(GraphmanServerConfig { + pool, + notification_sender, + store, + logger_factory, + auth_token: auth_token.to_owned(), + }) +} diff --git a/node/src/lib.rs b/node/src/lib.rs index f65ffc1be8f..7e15869d941 100644 --- a/node/src/lib.rs +++ b/node/src/lib.rs @@ -7,12 +7,12 @@ extern crate diesel; pub mod chain; pub mod config; +pub mod launcher; +pub mod manager; pub mod network_setup; pub mod opt; pub mod store_builder; -pub mod manager; - pub struct MetricsContext { pub prometheus: Arc, pub registry: Arc, diff --git a/node/src/main.rs b/node/src/main.rs index 6cd892079c1..1c0cfeda3a4 100644 --- a/node/src/main.rs +++ b/node/src/main.rs @@ -1,82 +1,12 @@ use clap::Parser as _; -use git_testament::{git_testament, render_testament}; -use graph::futures01::Future as _; -use graph::futures03::compat::Future01CompatExt; -use graph::futures03::future::TryFutureExt; +use git_testament::git_testament; -use graph::blockchain::{Blockchain, BlockchainKind}; -use graph::components::link_resolver::{ArweaveClient, FileSizeLimit}; -use graph::components::subgraph::Settings; -use graph::data::graphql::load_manager::LoadManager; -use graph::endpoint::EndpointMetrics; use graph::env::EnvVars; -use graph::log::logger; use graph::prelude::*; -use graph::prometheus::Registry; -use graph::url::Url; -use graph_core::polling_monitor::{arweave_service, ipfs_service}; -use graph_core::{ - SubgraphAssignmentProvider as IpfsSubgraphAssignmentProvider, SubgraphInstanceManager, - SubgraphRegistrar as IpfsSubgraphRegistrar, -}; -use graph_graphql::prelude::GraphQlRunner; -use graph_node::config::Config; -use graph_node::network_setup::Networks; -use graph_node::opt; -use graph_node::store_builder::StoreBuilder; -use graph_server_http::GraphQLServer as GraphQLQueryServer; -use graph_server_index_node::IndexNodeServer; -use graph_server_json_rpc::JsonRpcServer; -use graph_server_metrics::PrometheusMetricsServer; -use graph_store_postgres::{ - register_jobs as register_store_jobs, ConnectionPool, NotificationSender, Store, -}; -use graphman_server::GraphmanServer; -use graphman_server::GraphmanServerConfig; -use std::io::{BufRead, BufReader}; -use std::path::Path; -use std::time::Duration; -use tokio::sync::mpsc; -git_testament!(TESTAMENT); +use graph_node::{launcher, opt}; -fn read_expensive_queries( - logger: &Logger, - expensive_queries_filename: String, -) -> Result>, std::io::Error> { - // A file with a list of expensive queries, one query per line - // Attempts to run these queries will return a - // QueryExecutionError::TooExpensive to clients - let path = Path::new(&expensive_queries_filename); - let mut queries = Vec::new(); - if path.exists() { - info!( - logger, - "Reading expensive queries file: {}", expensive_queries_filename - ); - let file = std::fs::File::open(path)?; - let reader = BufReader::new(file); - for line in reader.lines() { - let line = line?; - let query = q::parse_query(&line) - .map_err(|e| { - let msg = format!( - "invalid GraphQL query in {}: {}\n{}", - expensive_queries_filename, e, line - ); - std::io::Error::new(std::io::ErrorKind::InvalidData, msg) - })? - .into_static(); - queries.push(Arc::new(query)); - } - } else { - warn!( - logger, - "Expensive queries file not set to a valid file: {}", expensive_queries_filename - ); - } - Ok(queries) -} +git_testament!(TESTAMENT); fn main() { let max_blocking: usize = std::env::var("GRAPH_MAX_BLOCKING_THREADS") @@ -93,517 +23,8 @@ fn main() { } async fn main_inner() { - env_logger::init(); - let env_vars = Arc::new(EnvVars::from_env().unwrap()); let opt = opt::Opt::parse(); - // Set up logger - let logger = logger(opt.debug); - - // Log version information - info!( - logger, - "Graph Node version: {}", - render_testament!(TESTAMENT) - ); - - if !graph_server_index_node::PoiProtection::from_env(&ENV_VARS).is_active() { - warn!( - logger, - "GRAPH_POI_ACCESS_TOKEN not set; might leak POIs to the public via GraphQL" - ); - } - - let config = match Config::load(&logger, &opt.clone().into()) { - Err(e) => { - eprintln!("configuration error: {}", e); - std::process::exit(1); - } - Ok(config) => config, - }; - - let subgraph_settings = match env_vars.subgraph_settings { - Some(ref path) => { - info!(logger, "Reading subgraph configuration file `{}`", path); - match Settings::from_file(path) { - Ok(rules) => rules, - Err(e) => { - eprintln!("configuration error in subgraph settings {}: {}", path, e); - std::process::exit(1); - } - } - } - None => Settings::default(), - }; - - if opt.check_config { - match config.to_json() { - Ok(txt) => println!("{}", txt), - Err(e) => eprintln!("error serializing config: {}", e), - } - eprintln!("Successfully validated configuration"); - std::process::exit(0); - } - - let node_id = NodeId::new(opt.node_id.clone()) - .expect("Node ID must be between 1 and 63 characters in length"); - - // Obtain subgraph related command-line arguments - let subgraph = opt.subgraph.clone(); - - // Obtain ports to use for the GraphQL server(s) - let http_port = opt.http_port; - - // Obtain JSON-RPC server port - let json_rpc_port = opt.admin_port; - - // Obtain index node server port - let index_node_port = opt.index_node_port; - - // Obtain metrics server port - let metrics_port = opt.metrics_port; - - // Obtain the fork base URL - let fork_base = match &opt.fork_base { - Some(url) => { - // Make sure the endpoint ends with a terminating slash. - let url = if !url.ends_with('/') { - let mut url = url.clone(); - url.push('/'); - Url::parse(&url) - } else { - Url::parse(url) - }; - - Some(url.expect("Failed to parse the fork base URL")) - } - None => { - warn!( - logger, - "No fork base URL specified, subgraph forking is disabled" - ); - None - } - }; - - info!(logger, "Starting up"); - - // Optionally, identify the Elasticsearch logging configuration - let elastic_config = opt - .elasticsearch_url - .clone() - .map(|endpoint| ElasticLoggingConfig { - endpoint, - username: opt.elasticsearch_user.clone(), - password: opt.elasticsearch_password.clone(), - client: reqwest::Client::new(), - }); - - // Set up Prometheus registry - let prometheus_registry = Arc::new(Registry::new()); - let metrics_registry = Arc::new(MetricsRegistry::new( - logger.clone(), - prometheus_registry.clone(), - )); - - // Create a component and subgraph logger factory - let logger_factory = - LoggerFactory::new(logger.clone(), elastic_config, metrics_registry.clone()); - - let ipfs_client = graph::ipfs::new_ipfs_client(&opt.ipfs, &logger) - .await - .unwrap_or_else(|err| panic!("Failed to create IPFS client: {err:#}")); - - let ipfs_service = ipfs_service( - ipfs_client.cheap_clone(), - ENV_VARS.mappings.max_ipfs_file_bytes, - ENV_VARS.mappings.ipfs_timeout, - ENV_VARS.mappings.ipfs_request_limit, - ); - - let arweave_resolver = Arc::new(ArweaveClient::new( - logger.cheap_clone(), - opt.arweave - .parse() - .expect("unable to parse arweave gateway address"), - )); - - let arweave_service = arweave_service( - arweave_resolver.cheap_clone(), - env_vars.mappings.ipfs_request_limit, - match env_vars.mappings.max_ipfs_file_bytes { - 0 => FileSizeLimit::Unlimited, - n => FileSizeLimit::MaxBytes(n as u64), - }, - ); - - // Convert the clients into a link resolver. Since we want to get past - // possible temporary DNS failures, make the resolver retry - let link_resolver = Arc::new(IpfsResolver::new(ipfs_client, env_vars.cheap_clone())); - let metrics_server = PrometheusMetricsServer::new(&logger_factory, prometheus_registry.clone()); - - let endpoint_metrics = Arc::new(EndpointMetrics::new( - logger.clone(), - &config.chains.providers(), - metrics_registry.cheap_clone(), - )); - - let graphql_metrics_registry = metrics_registry.clone(); - - let contention_logger = logger.clone(); - - // TODO: make option loadable from configuration TOML and environment: - let expensive_queries = - read_expensive_queries(&logger, opt.expensive_queries_filename).unwrap(); - - let store_builder = StoreBuilder::new( - &logger, - &node_id, - &config, - fork_base, - metrics_registry.cheap_clone(), - ) - .await; - - let primary_pool = store_builder.primary_pool(); - let subscription_manager = store_builder.subscription_manager(); - let chain_head_update_listener = store_builder.chain_head_update_listener(); - let network_store = store_builder.network_store(config.chain_ids()); - - let graphman_server_config = make_graphman_server_config( - primary_pool.clone(), - network_store.cheap_clone(), - metrics_registry.cheap_clone(), - &env_vars, - &logger, - &logger_factory, - ); - - start_graphman_server(opt.graphman_port, graphman_server_config).await; - - let launch_services = |logger: Logger, env_vars: Arc| async move { - use graph::components::network_provider; - - let block_store = network_store.block_store(); - - let mut provider_checks: Vec> = Vec::new(); - - if env_vars.genesis_validation_enabled { - provider_checks.push(Arc::new(network_provider::GenesisHashCheck::new( - block_store.clone(), - ))); - } - - provider_checks.push(Arc::new(network_provider::ExtendedBlocksCheck::new( - env_vars - .firehose_disable_extended_blocks_for_chains - .iter() - .map(|x| x.as_str().into()), - ))); - - let network_adapters = Networks::from_config( - logger.cheap_clone(), - &config, - metrics_registry.cheap_clone(), - endpoint_metrics, - &provider_checks, - ) - .await - .expect("unable to parse network configuration"); - - let blockchain_map = network_adapters - .blockchain_map( - &env_vars, - &node_id, - &logger, - block_store, - &logger_factory, - metrics_registry.cheap_clone(), - chain_head_update_listener, - ) - .await; - - // see comment on cleanup_ethereum_shallow_blocks - if !opt.disable_block_ingestor { - match blockchain_map - .get_all_by_kind::(BlockchainKind::Ethereum) - .ok() - .map(|chains| { - chains - .iter() - .flat_map(|c| { - if !c.chain_client().is_firehose() { - Some(c.name.to_string()) - } else { - None - } - }) - .collect() - }) { - Some(eth_network_names) => { - network_store - .block_store() - .cleanup_ethereum_shallow_blocks(eth_network_names) - .unwrap(); - } - // This code path only happens if the downcast on the blockchain map fails, that - // probably means we have a problem with the chain loading logic so it's probably - // safest to just refuse to start. - None => unreachable!( - "If you are seeing this message just use a different version of graph-node" - ), - } - } - - let blockchain_map = Arc::new(blockchain_map); - - let shards: Vec<_> = config.stores.keys().cloned().collect(); - let load_manager = Arc::new(LoadManager::new( - &logger, - shards, - expensive_queries, - metrics_registry.clone(), - )); - let graphql_runner = Arc::new(GraphQlRunner::new( - &logger, - network_store.clone(), - load_manager, - graphql_metrics_registry, - )); - let graphql_server = GraphQLQueryServer::new(&logger_factory, graphql_runner.clone()); - - let index_node_server = IndexNodeServer::new( - &logger_factory, - blockchain_map.clone(), - network_store.clone(), - link_resolver.clone(), - ); - - if !opt.disable_block_ingestor { - let logger = logger.clone(); - let ingestors = Networks::block_ingestors(&logger, &blockchain_map) - .await - .expect("unable to start block ingestors"); - - ingestors.into_iter().for_each(|ingestor| { - let logger = logger.clone(); - info!(logger,"Starting block ingestor for network";"network_name" => &ingestor.network_name().as_str(), "kind" => ingestor.kind().to_string()); - - graph::spawn(ingestor.run()); - }); - - // Start a task runner - let mut job_runner = graph::util::jobs::Runner::new(&logger); - register_store_jobs( - &mut job_runner, - network_store.clone(), - primary_pool, - metrics_registry.clone(), - ); - graph::spawn_blocking(job_runner.start()); - } - let static_filters = ENV_VARS.experimental_static_filters; - - let sg_count = Arc::new(SubgraphCountMetric::new(metrics_registry.cheap_clone())); - - let subgraph_instance_manager = SubgraphInstanceManager::new( - &logger_factory, - env_vars.cheap_clone(), - network_store.subgraph_store(), - blockchain_map.cheap_clone(), - sg_count.cheap_clone(), - metrics_registry.clone(), - link_resolver.clone(), - ipfs_service, - arweave_service, - static_filters, - ); - - // Create IPFS-based subgraph provider - let subgraph_provider = IpfsSubgraphAssignmentProvider::new( - &logger_factory, - link_resolver.clone(), - subgraph_instance_manager, - sg_count, - ); - - // Check version switching mode environment variable - let version_switching_mode = ENV_VARS.subgraph_version_switching_mode; - - // Create named subgraph provider for resolving subgraph name->ID mappings - let subgraph_registrar = Arc::new(IpfsSubgraphRegistrar::new( - &logger_factory, - link_resolver, - Arc::new(subgraph_provider), - network_store.subgraph_store(), - subscription_manager, - blockchain_map, - node_id.clone(), - version_switching_mode, - Arc::new(subgraph_settings), - )); - graph::spawn( - subgraph_registrar - .start() - .map_err(|e| panic!("failed to initialize subgraph provider {}", e)) - .compat(), - ); - - // Start admin JSON-RPC server. - let json_rpc_server = JsonRpcServer::serve( - json_rpc_port, - http_port, - subgraph_registrar.clone(), - node_id.clone(), - logger.clone(), - ) - .await - .expect("failed to start JSON-RPC admin server"); - - // Let the server run forever. - std::mem::forget(json_rpc_server); - - // Add the CLI subgraph with a REST request to the admin server. - if let Some(subgraph) = subgraph { - let (name, hash) = if subgraph.contains(':') { - let mut split = subgraph.split(':'); - (split.next().unwrap(), split.next().unwrap().to_owned()) - } else { - ("cli", subgraph) - }; - - let name = SubgraphName::new(name) - .expect("Subgraph name must contain only a-z, A-Z, 0-9, '-' and '_'"); - let subgraph_id = - DeploymentHash::new(hash).expect("Subgraph hash must be a valid IPFS hash"); - let debug_fork = opt - .debug_fork - .map(DeploymentHash::new) - .map(|h| h.expect("Debug fork hash must be a valid IPFS hash")); - let start_block = opt - .start_block - .map(|block| { - let mut split = block.split(':'); - ( - // BlockHash - split.next().unwrap().to_owned(), - // BlockNumber - split.next().unwrap().parse::().unwrap(), - ) - }) - .map(|(hash, number)| BlockPtr::try_from((hash.as_str(), number))) - .map(Result::unwrap); - - graph::spawn( - async move { - subgraph_registrar.create_subgraph(name.clone()).await?; - subgraph_registrar - .create_subgraph_version( - name, - subgraph_id, - node_id, - debug_fork, - start_block, - None, - None, - ) - .await - } - .map_err(|e| panic!("Failed to deploy subgraph from `--subgraph` flag: {}", e)), - ); - } - - // Serve GraphQL queries over HTTP - graph::spawn(async move { graphql_server.start(http_port).await }); - - // Run the index node server - graph::spawn(async move { index_node_server.start(index_node_port).await }); - - graph::spawn(async move { - metrics_server - .start(metrics_port) - .await - .expect("Failed to start metrics server") - }); - }; - - graph::spawn(launch_services(logger.clone(), env_vars.cheap_clone())); - - // Periodically check for contention in the tokio threadpool. First spawn a - // task that simply responds to "ping" requests. Then spawn a separate - // thread to periodically ping it and check responsiveness. - let (ping_send, mut ping_receive) = mpsc::channel::>(1); - graph::spawn(async move { - while let Some(pong_send) = ping_receive.recv().await { - let _ = pong_send.clone().send(()); - } - panic!("ping sender dropped"); - }); - std::thread::spawn(move || loop { - std::thread::sleep(Duration::from_secs(1)); - let (pong_send, pong_receive) = std::sync::mpsc::sync_channel(1); - if graph::futures03::executor::block_on(ping_send.clone().send(pong_send)).is_err() { - debug!(contention_logger, "Shutting down contention checker thread"); - break; - } - let mut timeout = Duration::from_millis(10); - while pong_receive.recv_timeout(timeout) == Err(std::sync::mpsc::RecvTimeoutError::Timeout) - { - debug!(contention_logger, "Possible contention in tokio threadpool"; - "timeout_ms" => timeout.as_millis(), - "code" => LogCode::TokioContention); - if timeout < ENV_VARS.kill_if_unresponsive_timeout { - timeout *= 10; - } else if ENV_VARS.kill_if_unresponsive { - // The node is unresponsive, kill it in hopes it will be restarted. - crit!(contention_logger, "Node is unresponsive, killing process"); - std::process::abort() - } - } - }); - - graph::futures03::future::pending::<()>().await; -} - -async fn start_graphman_server(port: u16, config: Option>) { - let Some(config) = config else { - return; - }; - - let server = GraphmanServer::new(config) - .unwrap_or_else(|err| panic!("Invalid graphman server configuration: {err:#}")); - - server - .start(port) - .await - .unwrap_or_else(|err| panic!("Failed to start graphman server: {err:#}")); -} - -fn make_graphman_server_config<'a>( - pool: ConnectionPool, - store: Arc, - metrics_registry: Arc, - env_vars: &EnvVars, - logger: &Logger, - logger_factory: &'a LoggerFactory, -) -> Option> { - let Some(auth_token) = &env_vars.graphman_server_auth_token else { - warn!( - logger, - "Missing graphman server auth token; graphman server will not start", - ); - - return None; - }; - - let notification_sender = Arc::new(NotificationSender::new(metrics_registry.clone())); - - Some(GraphmanServerConfig { - pool, - notification_sender, - store, - logger_factory, - auth_token: auth_token.to_owned(), - }) + launcher::run(opt, env_vars).await; } From 861f0aa0a4a43d5854aa67329ced03a7b1c382fe Mon Sep 17 00:00:00 2001 From: incrypto32 Date: Wed, 30 Apr 2025 18:24:08 +0400 Subject: [PATCH 02/11] node/launcher: extract setup_configuration helper from run --- node/src/launcher.rs | 194 +++++++++++++++++++++++-------------------- 1 file changed, 103 insertions(+), 91 deletions(-) diff --git a/node/src/launcher.rs b/node/src/launcher.rs index 5357926a8f2..24d2e26c5c2 100644 --- a/node/src/launcher.rs +++ b/node/src/launcher.rs @@ -41,44 +41,6 @@ use tokio::sync::mpsc; git_testament!(TESTAMENT); -fn read_expensive_queries( - logger: &Logger, - expensive_queries_filename: String, -) -> Result>, std::io::Error> { - // A file with a list of expensive queries, one query per line - // Attempts to run these queries will return a - // QueryExecutionError::TooExpensive to clients - let path = Path::new(&expensive_queries_filename); - let mut queries = Vec::new(); - if path.exists() { - info!( - logger, - "Reading expensive queries file: {}", expensive_queries_filename - ); - let file = std::fs::File::open(path)?; - let reader = BufReader::new(file); - for line in reader.lines() { - let line = line?; - let query = q::parse_query(&line) - .map_err(|e| { - let msg = format!( - "invalid GraphQL query in {}: {}\n{}", - expensive_queries_filename, e, line - ); - std::io::Error::new(std::io::ErrorKind::InvalidData, msg) - })? - .into_static(); - queries.push(Arc::new(query)); - } - } else { - warn!( - logger, - "Expensive queries file not set to a valid file: {}", expensive_queries_filename - ); - } - Ok(queries) -} - pub async fn run(opt: Opt, env_vars: Arc) { env_logger::init(); // Set up logger @@ -98,36 +60,8 @@ pub async fn run(opt: Opt, env_vars: Arc) { ); } - let config = match Config::load(&logger, &opt.clone().into()) { - Err(e) => { - eprintln!("configuration error: {}", e); - std::process::exit(1); - } - Ok(config) => config, - }; - - let subgraph_settings = match env_vars.subgraph_settings { - Some(ref path) => { - info!(logger, "Reading subgraph configuration file `{}`", path); - match Settings::from_file(path) { - Ok(rules) => rules, - Err(e) => { - eprintln!("configuration error in subgraph settings {}: {}", path, e); - std::process::exit(1); - } - } - } - None => Settings::default(), - }; - - if opt.check_config { - match config.to_json() { - Ok(txt) => println!("{}", txt), - Err(e) => eprintln!("error serializing config: {}", e), - } - eprintln!("Successfully validated configuration"); - std::process::exit(0); - } + // Get configuration + let (config, subgraph_settings, fork_base) = setup_configuration(&opt, &logger, &env_vars); let node_id = NodeId::new(opt.node_id.clone()) .expect("Node ID must be between 1 and 63 characters in length"); @@ -147,29 +81,6 @@ pub async fn run(opt: Opt, env_vars: Arc) { // Obtain metrics server port let metrics_port = opt.metrics_port; - // Obtain the fork base URL - let fork_base = match &opt.fork_base { - Some(url) => { - // Make sure the endpoint ends with a terminating slash. - let url = if !url.ends_with('/') { - let mut url = url.clone(); - url.push('/'); - Url::parse(&url) - } else { - Url::parse(url) - }; - - Some(url.expect("Failed to parse the fork base URL")) - } - None => { - warn!( - logger, - "No fork base URL specified, subgraph forking is disabled" - ); - None - } - }; - info!(logger, "Starting up"); // Optionally, identify the Elasticsearch logging configuration @@ -549,6 +460,69 @@ pub async fn run(opt: Opt, env_vars: Arc) { graph::futures03::future::pending::<()>().await; } +/// Sets up and loads configuration based on command line options +fn setup_configuration( + opt: &Opt, + logger: &Logger, + env_vars: &Arc, +) -> (Config, Settings, Option) { + let config = match Config::load(logger, &opt.clone().into()) { + Err(e) => { + eprintln!("configuration error: {}", e); + std::process::exit(1); + } + Ok(config) => config, + }; + + let subgraph_settings = match env_vars.subgraph_settings { + Some(ref path) => { + info!(logger, "Reading subgraph configuration file `{}`", path); + match Settings::from_file(path) { + Ok(rules) => rules, + Err(e) => { + eprintln!("configuration error in subgraph settings {}: {}", path, e); + std::process::exit(1); + } + } + } + None => Settings::default(), + }; + + if opt.check_config { + match config.to_json() { + Ok(txt) => println!("{}", txt), + Err(e) => eprintln!("error serializing config: {}", e), + } + eprintln!("Successfully validated configuration"); + std::process::exit(0); + } + + // Obtain the fork base URL + let fork_base = match &opt.fork_base { + Some(url) => { + // Make sure the endpoint ends with a terminating slash. + let url = if !url.ends_with('/') { + let mut url = url.clone(); + url.push('/'); + Url::parse(&url) + } else { + Url::parse(url) + }; + + Some(url.expect("Failed to parse the fork base URL")) + } + None => { + warn!( + logger, + "No fork base URL specified, subgraph forking is disabled" + ); + None + } + }; + + (config, subgraph_settings, fork_base) +} + async fn start_graphman_server(port: u16, config: Option>) { let Some(config) = config else { return; @@ -590,3 +564,41 @@ fn make_graphman_server_config<'a>( auth_token: auth_token.to_owned(), }) } + +fn read_expensive_queries( + logger: &Logger, + expensive_queries_filename: String, +) -> Result>, std::io::Error> { + // A file with a list of expensive queries, one query per line + // Attempts to run these queries will return a + // QueryExecutionError::TooExpensive to clients + let path = Path::new(&expensive_queries_filename); + let mut queries = Vec::new(); + if path.exists() { + info!( + logger, + "Reading expensive queries file: {}", expensive_queries_filename + ); + let file = std::fs::File::open(path)?; + let reader = BufReader::new(file); + for line in reader.lines() { + let line = line?; + let query = q::parse_query(&line) + .map_err(|e| { + let msg = format!( + "invalid GraphQL query in {}: {}\n{}", + expensive_queries_filename, e, line + ); + std::io::Error::new(std::io::ErrorKind::InvalidData, msg) + })? + .into_static(); + queries.push(Arc::new(query)); + } + } else { + warn!( + logger, + "Expensive queries file not set to a valid file: {}", expensive_queries_filename + ); + } + Ok(queries) +} From a914c7966fbd9083305dcf1dad5f29ea60187249 Mon Sep 17 00:00:00 2001 From: incrypto32 Date: Wed, 30 Apr 2025 18:26:33 +0400 Subject: [PATCH 03/11] node/launcher: extract setup_metrics helper from run --- node/src/launcher.rs | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/node/src/launcher.rs b/node/src/launcher.rs index 24d2e26c5c2..ace750ea487 100644 --- a/node/src/launcher.rs +++ b/node/src/launcher.rs @@ -41,6 +41,18 @@ use tokio::sync::mpsc; git_testament!(TESTAMENT); +/// Sets up metrics and monitoring +fn setup_metrics(logger: &Logger) -> (Arc, Arc) { + // Set up Prometheus registry + let prometheus_registry = Arc::new(Registry::new()); + let metrics_registry = Arc::new(MetricsRegistry::new( + logger.clone(), + prometheus_registry.clone(), + )); + + (prometheus_registry, metrics_registry) +} + pub async fn run(opt: Opt, env_vars: Arc) { env_logger::init(); // Set up logger @@ -83,6 +95,9 @@ pub async fn run(opt: Opt, env_vars: Arc) { info!(logger, "Starting up"); + // Set up metrics + let (prometheus_registry, metrics_registry) = setup_metrics(&logger); + // Optionally, identify the Elasticsearch logging configuration let elastic_config = opt .elasticsearch_url @@ -94,13 +109,6 @@ pub async fn run(opt: Opt, env_vars: Arc) { client: reqwest::Client::new(), }); - // Set up Prometheus registry - let prometheus_registry = Arc::new(Registry::new()); - let metrics_registry = Arc::new(MetricsRegistry::new( - logger.clone(), - prometheus_registry.clone(), - )); - // Create a component and subgraph logger factory let logger_factory = LoggerFactory::new(logger.clone(), elastic_config, metrics_registry.clone()); From 95c104fbbd51ff92635bc25f3a1bb53b6cf28e49 Mon Sep 17 00:00:00 2001 From: incrypto32 Date: Wed, 30 Apr 2025 18:33:41 +0400 Subject: [PATCH 04/11] node/launcher: extract setup_store helper from run --- node/src/launcher.rs | 60 +++++++++++++++++++++++++++++++++----------- 1 file changed, 46 insertions(+), 14 deletions(-) diff --git a/node/src/launcher.rs b/node/src/launcher.rs index ace750ea487..ff48711c408 100644 --- a/node/src/launcher.rs +++ b/node/src/launcher.rs @@ -30,7 +30,8 @@ use graph_server_index_node::IndexNodeServer; use graph_server_json_rpc::JsonRpcServer; use graph_server_metrics::PrometheusMetricsServer; use graph_store_postgres::{ - register_jobs as register_store_jobs, ConnectionPool, NotificationSender, Store, + register_jobs as register_store_jobs, ChainHeadUpdateListener, ConnectionPool, + NotificationSender, Store, SubscriptionManager, }; use graphman_server::GraphmanServer; use graphman_server::GraphmanServerConfig; @@ -53,6 +54,41 @@ fn setup_metrics(logger: &Logger) -> (Arc, Arc) { (prometheus_registry, metrics_registry) } +/// Sets up the store and database connections +async fn setup_store( + logger: &Logger, + node_id: &NodeId, + config: &Config, + fork_base: Option, + metrics_registry: Arc, +) -> ( + ConnectionPool, + Arc, + Arc, + Arc, +) { + let store_builder = StoreBuilder::new( + logger, + node_id, + config, + fork_base, + metrics_registry.cheap_clone(), + ) + .await; + + let primary_pool = store_builder.primary_pool(); + let subscription_manager = store_builder.subscription_manager(); + let chain_head_update_listener = store_builder.chain_head_update_listener(); + let network_store = store_builder.network_store(config.chain_ids()); + + ( + primary_pool, + subscription_manager, + chain_head_update_listener, + network_store, + ) +} + pub async fn run(opt: Opt, env_vars: Arc) { env_logger::init(); // Set up logger @@ -159,19 +195,15 @@ pub async fn run(opt: Opt, env_vars: Arc) { let expensive_queries = read_expensive_queries(&logger, opt.expensive_queries_filename).unwrap(); - let store_builder = StoreBuilder::new( - &logger, - &node_id, - &config, - fork_base, - metrics_registry.cheap_clone(), - ) - .await; - - let primary_pool = store_builder.primary_pool(); - let subscription_manager = store_builder.subscription_manager(); - let chain_head_update_listener = store_builder.chain_head_update_listener(); - let network_store = store_builder.network_store(config.chain_ids()); + let (primary_pool, subscription_manager, chain_head_update_listener, network_store) = + setup_store( + &logger, + &node_id, + &config, + fork_base, + metrics_registry.cheap_clone(), + ) + .await; let graphman_server_config = make_graphman_server_config( primary_pool.clone(), From a7ad5e98f95e4a220a509f6722a72cc39f3de94d Mon Sep 17 00:00:00 2001 From: incrypto32 Date: Wed, 30 Apr 2025 18:51:43 +0400 Subject: [PATCH 05/11] node/launcher: extract build_blockchain_map helper from run --- node/src/launcher.rs | 104 ++++++++++++++++++++++++++----------------- 1 file changed, 64 insertions(+), 40 deletions(-) diff --git a/node/src/launcher.rs b/node/src/launcher.rs index ff48711c408..4d3702835fb 100644 --- a/node/src/launcher.rs +++ b/node/src/launcher.rs @@ -9,7 +9,7 @@ use crate::config::Config; use crate::network_setup::Networks; use crate::opt::Opt; use crate::store_builder::StoreBuilder; -use graph::blockchain::{Blockchain, BlockchainKind}; +use graph::blockchain::{Blockchain, BlockchainKind, BlockchainMap}; use graph::components::link_resolver::{ArweaveClient, FileSizeLimit}; use graph::components::subgraph::Settings; use graph::data::graphql::load_manager::LoadManager; @@ -89,6 +89,60 @@ async fn setup_store( ) } +async fn build_blockchain_map( + logger: &Logger, + config: &Config, + env_vars: &Arc, + node_id: &NodeId, + network_store: Arc, + metrics_registry: Arc, + endpoint_metrics: Arc, + chain_head_update_listener: Arc, + logger_factory: &LoggerFactory, +) -> Arc { + use graph::components::network_provider; + let block_store = network_store.block_store(); + + let mut provider_checks: Vec> = Vec::new(); + + if env_vars.genesis_validation_enabled { + provider_checks.push(Arc::new(network_provider::GenesisHashCheck::new( + block_store.clone(), + ))); + } + + provider_checks.push(Arc::new(network_provider::ExtendedBlocksCheck::new( + env_vars + .firehose_disable_extended_blocks_for_chains + .iter() + .map(|x| x.as_str().into()), + ))); + + let network_adapters = Networks::from_config( + logger.cheap_clone(), + &config, + metrics_registry.cheap_clone(), + endpoint_metrics, + &provider_checks, + ) + .await + .expect("unable to parse network configuration"); + + let blockchain_map = network_adapters + .blockchain_map( + &env_vars, + &node_id, + &logger, + block_store, + &logger_factory, + metrics_registry.cheap_clone(), + chain_head_update_listener, + ) + .await; + + Arc::new(blockchain_map) +} + pub async fn run(opt: Opt, env_vars: Arc) { env_logger::init(); // Set up logger @@ -217,46 +271,18 @@ pub async fn run(opt: Opt, env_vars: Arc) { start_graphman_server(opt.graphman_port, graphman_server_config).await; let launch_services = |logger: Logger, env_vars: Arc| async move { - use graph::components::network_provider; - - let block_store = network_store.block_store(); - - let mut provider_checks: Vec> = Vec::new(); - - if env_vars.genesis_validation_enabled { - provider_checks.push(Arc::new(network_provider::GenesisHashCheck::new( - block_store.clone(), - ))); - } - - provider_checks.push(Arc::new(network_provider::ExtendedBlocksCheck::new( - env_vars - .firehose_disable_extended_blocks_for_chains - .iter() - .map(|x| x.as_str().into()), - ))); - - let network_adapters = Networks::from_config( - logger.cheap_clone(), + let blockchain_map = build_blockchain_map( + &logger, &config, - metrics_registry.cheap_clone(), + &env_vars, + &node_id, + network_store.clone(), + metrics_registry.clone(), endpoint_metrics, - &provider_checks, + chain_head_update_listener, + &logger_factory, ) - .await - .expect("unable to parse network configuration"); - - let blockchain_map = network_adapters - .blockchain_map( - &env_vars, - &node_id, - &logger, - block_store, - &logger_factory, - metrics_registry.cheap_clone(), - chain_head_update_listener, - ) - .await; + .await; // see comment on cleanup_ethereum_shallow_blocks if !opt.disable_block_ingestor { @@ -290,8 +316,6 @@ pub async fn run(opt: Opt, env_vars: Arc) { } } - let blockchain_map = Arc::new(blockchain_map); - let shards: Vec<_> = config.stores.keys().cloned().collect(); let load_manager = Arc::new(LoadManager::new( &logger, From a6b03cb406c1bfbbe972b9a13df14627503ee1c8 Mon Sep 17 00:00:00 2001 From: incrypto32 Date: Wed, 30 Apr 2025 18:57:38 +0400 Subject: [PATCH 06/11] node/launcher: extract cleanup_ethereum_shallow_blocks helper from run --- node/src/launcher.rs | 60 +++++++++++++++++++++++--------------------- 1 file changed, 32 insertions(+), 28 deletions(-) diff --git a/node/src/launcher.rs b/node/src/launcher.rs index 4d3702835fb..4cd1caa1dc8 100644 --- a/node/src/launcher.rs +++ b/node/src/launcher.rs @@ -143,6 +143,37 @@ async fn build_blockchain_map( Arc::new(blockchain_map) } +fn cleanup_ethereum_shallow_blocks(blockchain_map: &BlockchainMap, network_store: &Arc) { + match blockchain_map + .get_all_by_kind::(BlockchainKind::Ethereum) + .ok() + .map(|chains| { + chains + .iter() + .flat_map(|c| { + if !c.chain_client().is_firehose() { + Some(c.name.to_string()) + } else { + None + } + }) + .collect() + }) { + Some(eth_network_names) => { + network_store + .block_store() + .cleanup_ethereum_shallow_blocks(eth_network_names) + .unwrap(); + } + // This code path only happens if the downcast on the blockchain map fails, that + // probably means we have a problem with the chain loading logic so it's probably + // safest to just refuse to start. + None => unreachable!( + "If you are seeing this message just use a different version of graph-node" + ), + } +} + pub async fn run(opt: Opt, env_vars: Arc) { env_logger::init(); // Set up logger @@ -286,34 +317,7 @@ pub async fn run(opt: Opt, env_vars: Arc) { // see comment on cleanup_ethereum_shallow_blocks if !opt.disable_block_ingestor { - match blockchain_map - .get_all_by_kind::(BlockchainKind::Ethereum) - .ok() - .map(|chains| { - chains - .iter() - .flat_map(|c| { - if !c.chain_client().is_firehose() { - Some(c.name.to_string()) - } else { - None - } - }) - .collect() - }) { - Some(eth_network_names) => { - network_store - .block_store() - .cleanup_ethereum_shallow_blocks(eth_network_names) - .unwrap(); - } - // This code path only happens if the downcast on the blockchain map fails, that - // probably means we have a problem with the chain loading logic so it's probably - // safest to just refuse to start. - None => unreachable!( - "If you are seeing this message just use a different version of graph-node" - ), - } + cleanup_ethereum_shallow_blocks(&blockchain_map, &network_store); } let shards: Vec<_> = config.stores.keys().cloned().collect(); From 45a4da2d9b31e78af0369a5e6053f7b99a3f1448 Mon Sep 17 00:00:00 2001 From: incrypto32 Date: Wed, 30 Apr 2025 19:06:24 +0400 Subject: [PATCH 07/11] node/launcher: extract spawn_block_ingestor helper from run --- node/src/launcher.rs | 58 +++++++++++++++++++++++++++++--------------- 1 file changed, 38 insertions(+), 20 deletions(-) diff --git a/node/src/launcher.rs b/node/src/launcher.rs index 4cd1caa1dc8..d8ef0a62b52 100644 --- a/node/src/launcher.rs +++ b/node/src/launcher.rs @@ -174,6 +174,36 @@ fn cleanup_ethereum_shallow_blocks(blockchain_map: &BlockchainMap, network_store } } +async fn spawn_block_ingestor( + logger: &Logger, + blockchain_map: &Arc, + network_store: &Arc, + primary_pool: ConnectionPool, + metrics_registry: &Arc, +) { + let logger = logger.clone(); + let ingestors = Networks::block_ingestors(&logger, &blockchain_map) + .await + .expect("unable to start block ingestors"); + + ingestors.into_iter().for_each(|ingestor| { + let logger = logger.clone(); + info!(logger,"Starting block ingestor for network";"network_name" => &ingestor.network_name().as_str(), "kind" => ingestor.kind().to_string()); + + graph::spawn(ingestor.run()); + }); + + // Start a task runner + let mut job_runner = graph::util::jobs::Runner::new(&logger); + register_store_jobs( + &mut job_runner, + network_store.clone(), + primary_pool, + metrics_registry.clone(), + ); + graph::spawn_blocking(job_runner.start()); +} + pub async fn run(opt: Opt, env_vars: Arc) { env_logger::init(); // Set up logger @@ -343,28 +373,16 @@ pub async fn run(opt: Opt, env_vars: Arc) { ); if !opt.disable_block_ingestor { - let logger = logger.clone(); - let ingestors = Networks::block_ingestors(&logger, &blockchain_map) - .await - .expect("unable to start block ingestors"); - - ingestors.into_iter().for_each(|ingestor| { - let logger = logger.clone(); - info!(logger,"Starting block ingestor for network";"network_name" => &ingestor.network_name().as_str(), "kind" => ingestor.kind().to_string()); - - graph::spawn(ingestor.run()); - }); - - // Start a task runner - let mut job_runner = graph::util::jobs::Runner::new(&logger); - register_store_jobs( - &mut job_runner, - network_store.clone(), + spawn_block_ingestor( + &logger, + &blockchain_map, + &network_store, primary_pool, - metrics_registry.clone(), - ); - graph::spawn_blocking(job_runner.start()); + &metrics_registry, + ) + .await; } + let static_filters = ENV_VARS.experimental_static_filters; let sg_count = Arc::new(SubgraphCountMetric::new(metrics_registry.cheap_clone())); From 0a3e5780b74f012dd3f433f23ad2536ab59e7568 Mon Sep 17 00:00:00 2001 From: incrypto32 Date: Wed, 30 Apr 2025 19:22:20 +0400 Subject: [PATCH 08/11] node/launcher: extract deploy_subgraph_from_flag helper from run --- node/src/launcher.rs | 104 ++++++++++++++++++++++++------------------- 1 file changed, 57 insertions(+), 47 deletions(-) diff --git a/node/src/launcher.rs b/node/src/launcher.rs index d8ef0a62b52..8bb3f88e910 100644 --- a/node/src/launcher.rs +++ b/node/src/launcher.rs @@ -204,6 +204,61 @@ async fn spawn_block_ingestor( graph::spawn_blocking(job_runner.start()); } +fn deploy_subgraph_from_flag( + subgraph: String, + opt: &Opt, + subgraph_registrar: Arc, + node_id: NodeId, +) { + let (name, hash) = if subgraph.contains(':') { + let mut split = subgraph.split(':'); + (split.next().unwrap(), split.next().unwrap().to_owned()) + } else { + ("cli", subgraph) + }; + + let name = SubgraphName::new(name) + .expect("Subgraph name must contain only a-z, A-Z, 0-9, '-' and '_'"); + let subgraph_id = DeploymentHash::new(hash).expect("Subgraph hash must be a valid IPFS hash"); + let debug_fork = opt + .debug_fork + .clone() + .map(DeploymentHash::new) + .map(|h| h.expect("Debug fork hash must be a valid IPFS hash")); + let start_block = opt + .start_block + .clone() + .map(|block| { + let mut split = block.split(':'); + ( + // BlockHash + split.next().unwrap().to_owned(), + // BlockNumber + split.next().unwrap().parse::().unwrap(), + ) + }) + .map(|(hash, number)| BlockPtr::try_from((hash.as_str(), number))) + .map(Result::unwrap); + + graph::spawn( + async move { + subgraph_registrar.create_subgraph(name.clone()).await?; + subgraph_registrar + .create_subgraph_version( + name, + subgraph_id, + node_id, + debug_fork, + start_block, + None, + None, + ) + .await + } + .map_err(|e| panic!("Failed to deploy subgraph from `--subgraph` flag: {}", e)), + ); +} + pub async fn run(opt: Opt, env_vars: Arc) { env_logger::init(); // Set up logger @@ -308,7 +363,7 @@ pub async fn run(opt: Opt, env_vars: Arc) { // TODO: make option loadable from configuration TOML and environment: let expensive_queries = - read_expensive_queries(&logger, opt.expensive_queries_filename).unwrap(); + read_expensive_queries(&logger, opt.expensive_queries_filename.clone()).unwrap(); let (primary_pool, subscription_manager, chain_head_update_listener, network_store) = setup_store( @@ -446,52 +501,7 @@ pub async fn run(opt: Opt, env_vars: Arc) { // Add the CLI subgraph with a REST request to the admin server. if let Some(subgraph) = subgraph { - let (name, hash) = if subgraph.contains(':') { - let mut split = subgraph.split(':'); - (split.next().unwrap(), split.next().unwrap().to_owned()) - } else { - ("cli", subgraph) - }; - - let name = SubgraphName::new(name) - .expect("Subgraph name must contain only a-z, A-Z, 0-9, '-' and '_'"); - let subgraph_id = - DeploymentHash::new(hash).expect("Subgraph hash must be a valid IPFS hash"); - let debug_fork = opt - .debug_fork - .map(DeploymentHash::new) - .map(|h| h.expect("Debug fork hash must be a valid IPFS hash")); - let start_block = opt - .start_block - .map(|block| { - let mut split = block.split(':'); - ( - // BlockHash - split.next().unwrap().to_owned(), - // BlockNumber - split.next().unwrap().parse::().unwrap(), - ) - }) - .map(|(hash, number)| BlockPtr::try_from((hash.as_str(), number))) - .map(Result::unwrap); - - graph::spawn( - async move { - subgraph_registrar.create_subgraph(name.clone()).await?; - subgraph_registrar - .create_subgraph_version( - name, - subgraph_id, - node_id, - debug_fork, - start_block, - None, - None, - ) - .await - } - .map_err(|e| panic!("Failed to deploy subgraph from `--subgraph` flag: {}", e)), - ); + deploy_subgraph_from_flag(subgraph, &opt, subgraph_registrar.clone(), node_id); } // Serve GraphQL queries over HTTP From 12ac0091677676a59dc0bc9369fc33f2cb99ce44 Mon Sep 17 00:00:00 2001 From: incrypto32 Date: Wed, 30 Apr 2025 19:28:08 +0400 Subject: [PATCH 09/11] node/launcher: extract spawn_contention_checker helper from run --- node/src/launcher.rs | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/node/src/launcher.rs b/node/src/launcher.rs index 8bb3f88e910..0652575eb27 100644 --- a/node/src/launcher.rs +++ b/node/src/launcher.rs @@ -359,8 +359,6 @@ pub async fn run(opt: Opt, env_vars: Arc) { let graphql_metrics_registry = metrics_registry.clone(); - let contention_logger = logger.clone(); - // TODO: make option loadable from configuration TOML and environment: let expensive_queries = read_expensive_queries(&logger, opt.expensive_queries_filename.clone()).unwrap(); @@ -520,6 +518,12 @@ pub async fn run(opt: Opt, env_vars: Arc) { graph::spawn(launch_services(logger.clone(), env_vars.cheap_clone())); + spawn_contention_checker(logger.clone()); + + graph::futures03::future::pending::<()>().await; +} + +fn spawn_contention_checker(logger: Logger) { // Periodically check for contention in the tokio threadpool. First spawn a // task that simply responds to "ping" requests. Then spawn a separate // thread to periodically ping it and check responsiveness. @@ -534,26 +538,24 @@ pub async fn run(opt: Opt, env_vars: Arc) { std::thread::sleep(Duration::from_secs(1)); let (pong_send, pong_receive) = std::sync::mpsc::sync_channel(1); if graph::futures03::executor::block_on(ping_send.clone().send(pong_send)).is_err() { - debug!(contention_logger, "Shutting down contention checker thread"); + debug!(logger, "Shutting down contention checker thread"); break; } let mut timeout = Duration::from_millis(10); while pong_receive.recv_timeout(timeout) == Err(std::sync::mpsc::RecvTimeoutError::Timeout) { - debug!(contention_logger, "Possible contention in tokio threadpool"; + debug!(logger, "Possible contention in tokio threadpool"; "timeout_ms" => timeout.as_millis(), "code" => LogCode::TokioContention); if timeout < ENV_VARS.kill_if_unresponsive_timeout { timeout *= 10; } else if ENV_VARS.kill_if_unresponsive { // The node is unresponsive, kill it in hopes it will be restarted. - crit!(contention_logger, "Node is unresponsive, killing process"); + crit!(logger, "Node is unresponsive, killing process"); std::process::abort() } } }); - - graph::futures03::future::pending::<()>().await; } /// Sets up and loads configuration based on command line options From f4c6cadf3090aa9ffcd841c9388d3d020e85a71c Mon Sep 17 00:00:00 2001 From: incrypto32 Date: Wed, 30 Apr 2025 19:45:57 +0400 Subject: [PATCH 10/11] node/launcher: extract build_graphql_server helper from run --- node/src/launcher.rs | 44 +++++++++++++++++++++++++++++++------------- 1 file changed, 31 insertions(+), 13 deletions(-) diff --git a/node/src/launcher.rs b/node/src/launcher.rs index 0652575eb27..69b5e848be9 100644 --- a/node/src/launcher.rs +++ b/node/src/launcher.rs @@ -357,8 +357,6 @@ pub async fn run(opt: Opt, env_vars: Arc) { metrics_registry.cheap_clone(), )); - let graphql_metrics_registry = metrics_registry.clone(); - // TODO: make option loadable from configuration TOML and environment: let expensive_queries = read_expensive_queries(&logger, opt.expensive_queries_filename.clone()).unwrap(); @@ -403,20 +401,14 @@ pub async fn run(opt: Opt, env_vars: Arc) { cleanup_ethereum_shallow_blocks(&blockchain_map, &network_store); } - let shards: Vec<_> = config.stores.keys().cloned().collect(); - let load_manager = Arc::new(LoadManager::new( + let graphql_server = build_graphql_server( + &config, &logger, - shards, expensive_queries, metrics_registry.clone(), - )); - let graphql_runner = Arc::new(GraphQlRunner::new( - &logger, - network_store.clone(), - load_manager, - graphql_metrics_registry, - )); - let graphql_server = GraphQLQueryServer::new(&logger_factory, graphql_runner.clone()); + &network_store, + &logger_factory, + ); let index_node_server = IndexNodeServer::new( &logger_factory, @@ -523,6 +515,32 @@ pub async fn run(opt: Opt, env_vars: Arc) { graph::futures03::future::pending::<()>().await; } +fn build_graphql_server( + config: &Config, + logger: &Logger, + expensive_queries: Vec>, + metrics_registry: Arc, + network_store: &Arc, + logger_factory: &LoggerFactory, +) -> GraphQLQueryServer> { + let shards: Vec<_> = config.stores.keys().cloned().collect(); + let load_manager = Arc::new(LoadManager::new( + &logger, + shards, + expensive_queries, + metrics_registry.clone(), + )); + let graphql_runner = Arc::new(GraphQlRunner::new( + &logger, + network_store.clone(), + load_manager, + metrics_registry, + )); + let graphql_server = GraphQLQueryServer::new(&logger_factory, graphql_runner.clone()); + + graphql_server +} + fn spawn_contention_checker(logger: Logger) { // Periodically check for contention in the tokio threadpool. First spawn a // task that simply responds to "ping" requests. Then spawn a separate From 15fe3c3a131b46eba272775dad9f16433c07647b Mon Sep 17 00:00:00 2001 From: incrypto32 Date: Wed, 30 Apr 2025 21:32:53 +0400 Subject: [PATCH 11/11] node/launcher: extract build_subgraph_registrar helper from run --- node/src/launcher.rs | 162 ++++++++++++++++++++++++++----------------- 1 file changed, 99 insertions(+), 63 deletions(-) diff --git a/node/src/launcher.rs b/node/src/launcher.rs index 69b5e848be9..1167cc08e1a 100644 --- a/node/src/launcher.rs +++ b/node/src/launcher.rs @@ -19,7 +19,7 @@ use graph::log::logger; use graph::prelude::*; use graph::prometheus::Registry; use graph::url::Url; -use graph_core::polling_monitor::{arweave_service, ipfs_service}; +use graph_core::polling_monitor::{arweave_service, ipfs_service, ArweaveService, IpfsService}; use graph_core::{ SubgraphAssignmentProvider as IpfsSubgraphAssignmentProvider, SubgraphInstanceManager, SubgraphRegistrar as IpfsSubgraphRegistrar, @@ -31,7 +31,7 @@ use graph_server_json_rpc::JsonRpcServer; use graph_server_metrics::PrometheusMetricsServer; use graph_store_postgres::{ register_jobs as register_store_jobs, ChainHeadUpdateListener, ConnectionPool, - NotificationSender, Store, SubscriptionManager, + NotificationSender, Store, SubgraphStore, SubscriptionManager, }; use graphman_server::GraphmanServer; use graphman_server::GraphmanServerConfig; @@ -259,6 +259,94 @@ fn deploy_subgraph_from_flag( ); } +fn build_subgraph_registrar( + metrics_registry: Arc, + network_store: &Arc, + logger_factory: &LoggerFactory, + env_vars: &Arc, + blockchain_map: Arc, + node_id: NodeId, + subgraph_settings: Settings, + link_resolver: Arc, + subscription_manager: Arc, + arweave_service: ArweaveService, + ipfs_service: IpfsService, +) -> Arc< + IpfsSubgraphRegistrar< + IpfsSubgraphAssignmentProvider>, + SubgraphStore, + SubscriptionManager, + >, +> { + let static_filters = ENV_VARS.experimental_static_filters; + let sg_count = Arc::new(SubgraphCountMetric::new(metrics_registry.cheap_clone())); + + let subgraph_instance_manager = SubgraphInstanceManager::new( + &logger_factory, + env_vars.cheap_clone(), + network_store.subgraph_store(), + blockchain_map.cheap_clone(), + sg_count.cheap_clone(), + metrics_registry.clone(), + link_resolver.clone(), + ipfs_service, + arweave_service, + static_filters, + ); + + // Create IPFS-based subgraph provider + let subgraph_provider = IpfsSubgraphAssignmentProvider::new( + &logger_factory, + link_resolver.clone(), + subgraph_instance_manager, + sg_count, + ); + + // Check version switching mode environment variable + let version_switching_mode = ENV_VARS.subgraph_version_switching_mode; + + // Create named subgraph provider for resolving subgraph name->ID mappings + let subgraph_registrar = Arc::new(IpfsSubgraphRegistrar::new( + &logger_factory, + link_resolver, + Arc::new(subgraph_provider), + network_store.subgraph_store(), + subscription_manager, + blockchain_map, + node_id.clone(), + version_switching_mode, + Arc::new(subgraph_settings), + )); + + subgraph_registrar +} + +fn build_graphql_server( + config: &Config, + logger: &Logger, + expensive_queries: Vec>, + metrics_registry: Arc, + network_store: &Arc, + logger_factory: &LoggerFactory, +) -> GraphQLQueryServer> { + let shards: Vec<_> = config.stores.keys().cloned().collect(); + let load_manager = Arc::new(LoadManager::new( + &logger, + shards, + expensive_queries, + metrics_registry.clone(), + )); + let graphql_runner = Arc::new(GraphQlRunner::new( + &logger, + network_store.clone(), + load_manager, + metrics_registry, + )); + let graphql_server = GraphQLQueryServer::new(&logger_factory, graphql_runner.clone()); + + graphql_server +} + pub async fn run(opt: Opt, env_vars: Arc) { env_logger::init(); // Set up logger @@ -428,46 +516,20 @@ pub async fn run(opt: Opt, env_vars: Arc) { .await; } - let static_filters = ENV_VARS.experimental_static_filters; - - let sg_count = Arc::new(SubgraphCountMetric::new(metrics_registry.cheap_clone())); - - let subgraph_instance_manager = SubgraphInstanceManager::new( - &logger_factory, - env_vars.cheap_clone(), - network_store.subgraph_store(), - blockchain_map.cheap_clone(), - sg_count.cheap_clone(), + let subgraph_registrar = build_subgraph_registrar( metrics_registry.clone(), - link_resolver.clone(), - ipfs_service, - arweave_service, - static_filters, - ); - - // Create IPFS-based subgraph provider - let subgraph_provider = IpfsSubgraphAssignmentProvider::new( + &network_store, &logger_factory, + &env_vars, + blockchain_map.clone(), + node_id.clone(), + subgraph_settings, link_resolver.clone(), - subgraph_instance_manager, - sg_count, + subscription_manager, + arweave_service, + ipfs_service, ); - // Check version switching mode environment variable - let version_switching_mode = ENV_VARS.subgraph_version_switching_mode; - - // Create named subgraph provider for resolving subgraph name->ID mappings - let subgraph_registrar = Arc::new(IpfsSubgraphRegistrar::new( - &logger_factory, - link_resolver, - Arc::new(subgraph_provider), - network_store.subgraph_store(), - subscription_manager, - blockchain_map, - node_id.clone(), - version_switching_mode, - Arc::new(subgraph_settings), - )); graph::spawn( subgraph_registrar .start() @@ -515,32 +577,6 @@ pub async fn run(opt: Opt, env_vars: Arc) { graph::futures03::future::pending::<()>().await; } -fn build_graphql_server( - config: &Config, - logger: &Logger, - expensive_queries: Vec>, - metrics_registry: Arc, - network_store: &Arc, - logger_factory: &LoggerFactory, -) -> GraphQLQueryServer> { - let shards: Vec<_> = config.stores.keys().cloned().collect(); - let load_manager = Arc::new(LoadManager::new( - &logger, - shards, - expensive_queries, - metrics_registry.clone(), - )); - let graphql_runner = Arc::new(GraphQlRunner::new( - &logger, - network_store.clone(), - load_manager, - metrics_registry, - )); - let graphql_server = GraphQLQueryServer::new(&logger_factory, graphql_runner.clone()); - - graphql_server -} - fn spawn_contention_checker(logger: Logger) { // Periodically check for contention in the tokio threadpool. First spawn a // task that simply responds to "ping" requests. Then spawn a separate