Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.

Commit 91336a1

Browse files
committed
Integrate cluster info voting into the TPU and ReplayStage
1 parent aa1e786 commit 91336a1

File tree

9 files changed

+113
-180
lines changed

9 files changed

+113
-180
lines changed

src/broadcast_service.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,7 @@ impl BroadcastService {
227227
) {
228228
match e {
229229
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => {
230-
return BroadcastServiceReturnType::ChannelDisconnected
230+
return BroadcastServiceReturnType::ChannelDisconnected;
231231
}
232232
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
233233
Error::ClusterInfoError(ClusterInfoError::NoPeers) => (), // TODO: Why are the unit-tests throwing hundreds of these?

src/cluster_info.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ pub const NEIGHBORHOOD_SIZE: usize = DATA_PLANE_FANOUT;
5656
pub const GROW_LAYER_CAPACITY: bool = false;
5757

5858
/// milliseconds we sleep for between gossip requests
59-
const GOSSIP_SLEEP_MILLIS: u64 = 100;
59+
pub const GOSSIP_SLEEP_MILLIS: u64 = 100;
6060

6161
#[derive(Debug, PartialEq, Eq)]
6262
pub enum ClusterInfoError {

src/cluster_info_vote_listener.rs

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
use crate::cluster_info::{ClusterInfo, GOSSIP_SLEEP_MILLIS};
2+
use crate::counter::Counter;
3+
use crate::packet;
4+
use crate::result::Result;
5+
use crate::service::Service;
6+
use crate::streamer::PacketSender;
7+
use log::Level;
8+
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
9+
use std::sync::{Arc, RwLock};
10+
use std::thread::{self, sleep, Builder, JoinHandle};
11+
use std::time::Duration;
12+
13+
pub struct ClusterInfoVoteListener {
14+
exit: Arc<AtomicBool>,
15+
thread_hdls: Vec<JoinHandle<()>>,
16+
}
17+
18+
impl ClusterInfoVoteListener {
19+
pub fn new(
20+
exit: Arc<AtomicBool>,
21+
cluster_info: Arc<RwLock<ClusterInfo>>,
22+
sender: PacketSender,
23+
) -> Self {
24+
let exit1 = exit.clone();
25+
let thread = Builder::new()
26+
.name("solana-cluster_info_vote_listener".to_string())
27+
.spawn(move || {
28+
let _ = Self::recv_loop(&exit1, &cluster_info, &sender);
29+
})
30+
.unwrap();
31+
Self {
32+
exit,
33+
thread_hdls: vec![thread],
34+
}
35+
}
36+
fn recv_loop(
37+
exit: &Arc<AtomicBool>,
38+
cluster_info: &Arc<RwLock<ClusterInfo>>,
39+
sender: &PacketSender,
40+
) -> Result<()> {
41+
let mut last_ts = 0;
42+
loop {
43+
if exit.load(Ordering::Relaxed) {
44+
return Ok(());
45+
}
46+
let (votes, new_ts) = cluster_info.read().unwrap().get_votes(last_ts);
47+
last_ts = new_ts;
48+
inc_new_counter_info!("cluster_info_vote_listener-recv_count", votes.len());
49+
let msgs = packet::to_packets(&votes);
50+
for m in msgs {
51+
sender.send(m)?;
52+
}
53+
sleep(Duration::from_millis(GOSSIP_SLEEP_MILLIS));
54+
}
55+
}
56+
pub fn close(&self) {
57+
self.exit.store(true, Ordering::Relaxed);
58+
}
59+
}
60+
61+
impl Service for ClusterInfoVoteListener {
62+
type JoinReturnType = ();
63+
64+
fn join(self) -> thread::Result<()> {
65+
for thread_hdl in self.thread_hdls {
66+
thread_hdl.join()?;
67+
}
68+
Ok(())
69+
}
70+
}

src/fetch_stage.rs

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
//! The `fetch_stage` batches input from a UDP socket and sends it to a channel.
22
33
use crate::service::Service;
4-
use crate::streamer::{self, PacketReceiver};
4+
use crate::streamer::{self, PacketReceiver, PacketSender};
55
use std::net::UdpSocket;
66
use std::sync::atomic::{AtomicBool, Ordering};
77
use std::sync::mpsc::channel;
@@ -16,20 +16,28 @@ pub struct FetchStage {
1616
impl FetchStage {
1717
#[allow(clippy::new_ret_no_self)]
1818
pub fn new(sockets: Vec<UdpSocket>, exit: Arc<AtomicBool>) -> (Self, PacketReceiver) {
19+
let (sender, receiver) = channel();
20+
(Self::new_with_sender(sockets, exit, sender), receiver)
21+
}
22+
pub fn new_with_sender(
23+
sockets: Vec<UdpSocket>,
24+
exit: Arc<AtomicBool>,
25+
sender: PacketSender,
26+
) -> Self {
1927
let tx_sockets = sockets.into_iter().map(Arc::new).collect();
20-
Self::new_multi_socket(tx_sockets, exit)
28+
Self::new_multi_socket(tx_sockets, exit, sender)
2129
}
2230
fn new_multi_socket(
2331
sockets: Vec<Arc<UdpSocket>>,
2432
exit: Arc<AtomicBool>,
25-
) -> (Self, PacketReceiver) {
26-
let (sender, receiver) = channel();
33+
sender: PacketSender,
34+
) -> Self {
2735
let thread_hdls: Vec<_> = sockets
2836
.into_iter()
2937
.map(|socket| streamer::receiver(socket, exit.clone(), sender.clone(), "fetch-stage"))
3038
.collect();
3139

32-
(Self { exit, thread_hdls }, receiver)
40+
Self { exit, thread_hdls }
3341
}
3442

3543
pub fn close(&self) {

src/fullnode.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -326,6 +326,7 @@ impl Fullnode {
326326
// Start in leader mode.
327327
let (tpu, entry_receiver, tpu_exit) = Tpu::new(
328328
&bank,
329+
cluster_info.clone(),
329330
Default::default(),
330331
node.sockets
331332
.tpu
@@ -483,6 +484,7 @@ impl Fullnode {
483484

484485
let (tpu, blob_receiver, tpu_exit) = Tpu::new(
485486
&self.bank,
487+
self.cluster_info.clone(),
486488
Default::default(),
487489
self.tpu_sockets
488490
.iter()

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ pub mod chacha;
2121
pub mod chacha_cuda;
2222
pub mod checkpoint;
2323
pub mod client;
24+
pub mod cluster_info_vote_listener;
2425
pub mod crds;
2526
pub mod crds_gossip;
2627
pub mod crds_gossip_error;

src/replay_stage.rs

Lines changed: 7 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,11 @@ use crate::leader_scheduler::TICKS_PER_BLOCK;
1111
use crate::packet::BlobError;
1212
use crate::result::{Error, Result};
1313
use crate::service::Service;
14-
use crate::streamer::{responder, BlobSender};
1514
use crate::vote_signer_proxy::VoteSignerProxy;
1615
use log::Level;
1716
use solana_metrics::{influxdb, submit};
1817
use solana_sdk::signature::{Keypair, KeypairUtil};
1918
use solana_sdk::timing::duration_as_ms;
20-
use std::net::UdpSocket;
2119
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
2220
use std::sync::mpsc::channel;
2321
use std::sync::mpsc::RecvTimeoutError;
@@ -52,7 +50,6 @@ impl Drop for Finalizer {
5250
}
5351

5452
pub struct ReplayStage {
55-
t_responder: JoinHandle<()>,
5653
t_replay: JoinHandle<Option<ReplayStageReturnType>>,
5754
}
5855

@@ -65,7 +62,6 @@ impl ReplayStage {
6562
window_receiver: &EntryReceiver,
6663
keypair: &Arc<Keypair>,
6764
vote_signer: Option<&Arc<VoteSignerProxy>>,
68-
vote_blob_sender: Option<&BlobSender>,
6965
ledger_entry_sender: &EntrySender,
7066
entry_height: &mut u64,
7167
last_entry_id: &mut Hash,
@@ -138,11 +134,8 @@ impl ReplayStage {
138134

139135
if 0 == num_ticks_to_next_vote {
140136
if let Some(signer) = vote_signer {
141-
if let Some(sender) = vote_blob_sender {
142-
signer
143-
.send_validator_vote(bank, &cluster_info, sender)
144-
.unwrap();
145-
}
137+
let vote = signer.validator_vote(bank);
138+
cluster_info.write().unwrap().push_vote(vote);
146139
}
147140
}
148141
let (scheduled_leader, _) = bank
@@ -204,10 +197,7 @@ impl ReplayStage {
204197
entry_height: u64,
205198
last_entry_id: Hash,
206199
) -> (Self, EntryReceiver) {
207-
let (vote_blob_sender, vote_blob_receiver) = channel();
208200
let (ledger_entry_sender, ledger_entry_receiver) = channel();
209-
let send = UdpSocket::bind("0.0.0.0:0").expect("bind");
210-
let t_responder = responder("replay_stage", Arc::new(send), vote_blob_receiver);
211201

212202
let keypair = Arc::new(keypair);
213203
let t_replay = Builder::new()
@@ -243,7 +233,6 @@ impl ReplayStage {
243233
&window_receiver,
244234
&keypair,
245235
vote_signer.as_ref(),
246-
Some(&vote_blob_sender),
247236
&ledger_entry_sender,
248237
&mut entry_height_,
249238
&mut last_entry_id,
@@ -259,21 +248,14 @@ impl ReplayStage {
259248
})
260249
.unwrap();
261250

262-
(
263-
Self {
264-
t_responder,
265-
t_replay,
266-
},
267-
ledger_entry_receiver,
268-
)
251+
(Self { t_replay }, ledger_entry_receiver)
269252
}
270253
}
271254

272255
impl Service for ReplayStage {
273256
type JoinReturnType = Option<ReplayStageReturnType>;
274257

275258
fn join(self) -> thread::Result<Option<ReplayStageReturnType>> {
276-
self.t_responder.join()?;
277259
self.t_replay.join()
278260
}
279261
}
@@ -486,10 +468,8 @@ mod test {
486468
last_entry_id,
487469
);
488470

489-
// Vote sender should error because no leader contact info is found in the
490-
// ClusterInfo
491-
let (mock_sender, _mock_receiver) = channel();
492-
let _vote_err = vote_signer.send_validator_vote(&bank, &cluster_info_me, &mock_sender);
471+
let vote = vote_signer.validator_vote(&bank);
472+
cluster_info_me.write().unwrap().push_vote(vote);
493473

494474
// Send ReplayStage an entry, should see it on the ledger writer receiver
495475
let next_tick = create_ticks(
@@ -602,10 +582,8 @@ mod test {
602582
last_entry_id,
603583
);
604584

605-
// Vote sender should error because no leader contact info is found in the
606-
// ClusterInfo
607-
let (mock_sender, _mock_receiver) = channel();
608-
let _vote_err = signer_proxy.send_validator_vote(&bank, &cluster_info_me, &mock_sender);
585+
let vote = signer_proxy.validator_vote(&bank);
586+
cluster_info_me.write().unwrap().push_vote(vote);
609587

610588
// Send enough ticks to trigger leader rotation
611589
let total_entries_to_send = (bootstrap_height - initial_tick_height) as usize;
@@ -685,7 +663,6 @@ mod test {
685663
&entry_receiver,
686664
&my_keypair,
687665
Some(&vote_signer),
688-
None,
689666
&ledger_entry_sender,
690667
&mut entry_height,
691668
&mut last_entry_id,
@@ -711,7 +688,6 @@ mod test {
711688
&entry_receiver,
712689
&Arc::new(Keypair::new()),
713690
Some(&vote_signer),
714-
None,
715691
&ledger_entry_sender,
716692
&mut entry_height,
717693
&mut last_entry_id,

src/tpu.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
44
use crate::bank::Bank;
55
use crate::banking_stage::{BankingStage, BankingStageReturnType};
6+
use crate::cluster_info::ClusterInfo;
7+
use crate::cluster_info_vote_listener::ClusterInfoVoteListener;
68
use crate::entry::Entry;
79
use crate::fetch_stage::FetchStage;
810
use crate::poh_service::Config;
@@ -12,8 +14,9 @@ use solana_sdk::hash::Hash;
1214
use solana_sdk::pubkey::Pubkey;
1315
use std::net::UdpSocket;
1416
use std::sync::atomic::{AtomicBool, Ordering};
17+
use std::sync::mpsc::channel;
1518
use std::sync::mpsc::Receiver;
16-
use std::sync::Arc;
19+
use std::sync::{Arc, RwLock};
1720
use std::thread;
1821

1922
pub enum TpuReturnType {
@@ -24,13 +27,15 @@ pub struct Tpu {
2427
fetch_stage: FetchStage,
2528
sigverify_stage: SigVerifyStage,
2629
banking_stage: BankingStage,
30+
cluster_info_vote_listener: ClusterInfoVoteListener,
2731
exit: Arc<AtomicBool>,
2832
}
2933

3034
impl Tpu {
3135
#[allow(clippy::new_ret_no_self)]
3236
pub fn new(
3337
bank: &Arc<Bank>,
38+
cluster_info: Arc<RwLock<ClusterInfo>>,
3439
tick_duration: Config,
3540
transactions_sockets: Vec<UdpSocket>,
3641
sigverify_disabled: bool,
@@ -40,7 +45,11 @@ impl Tpu {
4045
) -> (Self, Receiver<Vec<Entry>>, Arc<AtomicBool>) {
4146
let exit = Arc::new(AtomicBool::new(false));
4247

43-
let (fetch_stage, packet_receiver) = FetchStage::new(transactions_sockets, exit.clone());
48+
let (packet_sender, packet_receiver) = channel();
49+
let fetch_stage =
50+
FetchStage::new_with_sender(transactions_sockets, exit.clone(), packet_sender.clone());
51+
let cluster_info_vote_listener =
52+
ClusterInfoVoteListener::new(exit.clone(), cluster_info, packet_sender);
4453

4554
let (sigverify_stage, verified_receiver) =
4655
SigVerifyStage::new(packet_receiver, sigverify_disabled);
@@ -58,6 +67,7 @@ impl Tpu {
5867
fetch_stage,
5968
sigverify_stage,
6069
banking_stage,
70+
cluster_info_vote_listener,
6171
exit: exit.clone(),
6272
};
6373

@@ -84,6 +94,7 @@ impl Service for Tpu {
8494
fn join(self) -> thread::Result<(Option<TpuReturnType>)> {
8595
self.fetch_stage.join()?;
8696
self.sigverify_stage.join()?;
97+
self.cluster_info_vote_listener.join()?;
8798
match self.banking_stage.join()? {
8899
Some(BankingStageReturnType::LeaderRotation) => Ok(Some(TpuReturnType::LeaderRotation)),
89100
_ => Ok(None),

0 commit comments

Comments
 (0)