diff --git a/src/progress/progress_set.rs b/src/progress/progress_set.rs index 58530444e..76b4e6469 100644 --- a/src/progress/progress_set.rs +++ b/src/progress/progress_set.rs @@ -424,11 +424,7 @@ impl ProgressSet { pub fn quorum_recently_active(&mut self, perspective_of: u64, quorum_fn: QuorumFn) -> bool { let mut active = HashSet::default(); for (&id, pr) in self.voters_mut() { - if id == perspective_of { - active.insert(id); - continue; - } - if pr.recent_active { + if id == perspective_of || pr.recent_active { active.insert(id); } } diff --git a/src/raft.rs b/src/raft.rs index 345c51138..bd4842550 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -152,8 +152,8 @@ pub struct Raft { /// The queue of read-only requests. pub read_only: ReadOnly, - /// Ticks since it reached last electionTimeout when it is leader or candidate. - /// Number of ticks since it reached last electionTimeout or received a + /// Ticks since it reached last election_timeout when it is leader or candidate. + /// Number of ticks since it reached last election_timeout or received a /// valid message from current leader when it is a follower. pub election_elapsed: usize, @@ -547,12 +547,11 @@ impl Raft { fn try_batching(&mut self, to: u64, pr: &mut Progress, ents: &mut Vec) -> bool { // if MsgAppend for the receiver already exists, try_batching // will append the entries to the existing MsgAppend - let mut is_batched = false; for msg in &mut self.msgs { if msg.get_msg_type() == MessageType::MsgAppend && msg.to == to { if !ents.is_empty() { if !util::is_continuous_ents(msg, ents) { - return is_batched; + return false; } let mut batched_entries: Vec<_> = msg.take_entries().into(); batched_entries.append(ents); @@ -561,11 +560,10 @@ impl Raft { pr.update_state(last_idx); } msg.commit = self.raft_log.committed; - is_batched = true; - break; + return true; } } - is_batched + false } /// Sends an append RPC with new entries (if any) and the current commit index to the given @@ -621,7 +619,7 @@ impl Raft { true } - // send_heartbeat sends an empty MsgAppend + // Sends heartbeat to a node. fn send_heartbeat(&mut self, to: u64, pr: &Progress, ctx: Option>) { // Attach the commit as min(to.matched, self.raft_log.committed). // When the leader sends out heartbeat message, @@ -658,7 +656,7 @@ impl Raft { } } - /// Sends RPC, without entries to all the peers. + /// Sends heartbeat RPC to all the peers. pub fn bcast_heartbeat(&mut self) { let ctx = self.read_only.last_pending_request_ctx(); self.bcast_heartbeat_with_ctx(ctx) @@ -818,7 +816,7 @@ impl Raft { /// /// # Panics /// - /// Panics if a leader already exists. + /// Panics if this raft is already a leader. pub fn become_candidate(&mut self) { assert_ne!( self.state, @@ -841,7 +839,7 @@ impl Raft { /// /// # Panics /// - /// Panics if a leader already exists. + /// Panics if this raft is already a leader. pub fn become_pre_candidate(&mut self) { assert_ne!( self.state, @@ -913,11 +911,11 @@ impl Raft { /// Campaign to attempt to become a leader. /// - /// If prevote is enabled, this is handled as well. + /// If prevote is enabled, it is handled as well. pub fn campaign(&mut self, campaign_type: &[u8]) { let (vote_msg, term) = if campaign_type == CAMPAIGN_PRE_ELECTION { self.become_pre_candidate(); - // Pre-vote RPCs are sent for next term before we've incremented self.term. + // Pre-vote RPCs are sent for next term before we increase self.term. (MessageType::MsgRequestPreVote, self.term + 1) } else { self.become_candidate(); @@ -1052,18 +1050,19 @@ impl Raft { // that these messages were simply delayed in the network, but this could // also mean that this node has advanced its term number during a network // partition, and it is now unable to either win an election or to rejoin - // the majority on the old term. If checkQuorum is false, this will be - // handled by incrementing term numbers in response to MsgVote with a higher - // term, but if checkQuorum is true we may not advance the term on MsgVote and - // must generate other messages to advance the term. The net result of these - // two features is to minimize the disruption caused by nodes that have been - // removed from the cluster's configuration: a removed node will send MsgVotes - // which will be ignored, but it will not receive MsgApp or MsgHeartbeat, so it - // will not create disruptive term increases, by notifying leader of this node's - // activeness. - // The above comments also true for Pre-Vote + // the majority on the old term. If check_quorum is false, this will be + // handled by incrementing term numbers of other nodes in response to + // RequestVote with a higher term. But if check_quorum is true, other + // nodes may not advance terms on RequestVote and must generate other + // messages to advance them. The net result of check_quorum and pre_vote + // is to minimize the disruption caused by nodes that have been removed + // from the cluster's configuration: a removed node will send RequestVotes + // which will be ignored, but it will not receive MsgAppend or + // MsgHeartbeat, so it will not create disruptive term increases, by + // notifying leader of this node's activeness. + // The above comments also hold for Pre-Vote // - // When follower gets isolated, it soon starts an election ending + // When a follower gets isolated, it soon starts an election ending // up with a higher term than leader, although it won't receive enough // votes to win the election. When it regains connectivity, this response // with "pb.MsgAppResp" of higher term would force leader to step down. @@ -1072,9 +1071,14 @@ impl Raft { let to_send = new_message(m.from, MessageType::MsgAppendResponse, None); self.send(to_send); } else if m.get_msg_type() == MessageType::MsgRequestPreVote { - // Before pre_vote enable, there may be a recieving candidate with higher term, - // but less log. After update to pre_vote, the cluster may deadlock if - // we drop messages with a lower term. + // Unlike normal election, both sender and receiver of MsgReqPreVote{Resp} + // won't update their terms accordingly. This makes it possible that the + // nodes with more logs stuck at a lower term, and can never win an election. + // Thus the whole cluster may deadlock. + // To solve this problem, we explicitly reject MsgRequestPreVote, to hint + // the candidate to update its term. + // Notice that in a normal election, voters will bump its term in response + // to MsgReqPreVote, and solve the deadlock. info!( self.logger, "{} [log_term: {}, index: {}, vote: {}] rejected {:?} from {} [log_term: {}, index: {}] at term {}", @@ -1110,6 +1114,7 @@ impl Raft { #[cfg(feature = "failpoints")] fail_point!("before_step"); + // Now m.term == self.term, unless the message is PreVote related. match m.get_msg_type() { MessageType::MsgHup => self.hup(false), MessageType::MsgRequestVote | MessageType::MsgRequestPreVote => { @@ -1121,15 +1126,14 @@ impl Raft { (m.get_msg_type() == MessageType::MsgRequestPreVote && m.term > self.term); // ...and we believe the candidate is up to date. if can_vote && self.raft_log.is_up_to_date(m.index, m.log_term) { - // When responding to Msg{Pre,}Vote messages we include the term - // from the message, not the local term. To see why consider the - // case where a single node was previously partitioned away and - // it's local term is now of date. If we include the local term - // (recall that for pre-votes we don't update the local term), the - // (pre-)campaigning node on the other end will proceed to ignore - // the message (it ignores all out of date messages). - // The term in the original message and current local term are the - // same in the case of regular votes, but different for pre-votes. + // When responding to MsgReq{Pre,}Vote, we include the term from the + // message, but not the local term. This is because this node may be + // partitioned away and its local term would fall behind. Since we + // don't update local term for pre-votes, the pre-campaigning + // candidate on the other end will proceed to ignore the response + // message (it ignores all out of date messages). + // For regular votes, the message term will always be the same + // as local term here. self.log_vote_approve(&m); let mut to_send = new_message(m.from, vote_resp_msg_type(m.get_msg_type()), None); @@ -1159,6 +1163,7 @@ impl Raft { Ok(()) } + // Up for election. fn hup(&mut self, transfer_leader: bool) { if self.state == StateRole::Leader { debug!( @@ -1328,7 +1333,7 @@ impl Raft { prs: &mut ProgressSet, ctx: &mut HandleResponseContext, ) { - // Update the node. Drop the value explicitly since we'll check the qourum after. + // Update the node. Drop the value explicitly since we'll check the quorum after. { let pr = prs.get_mut(m.from).unwrap(); pr.recent_active = true; @@ -1418,8 +1423,8 @@ impl Raft { term = self.term, lead_transferee = lead_transferee; ); - // Transfer leadership should be finished in one electionTimeout - // so reset r.electionElapsed. + // Transfer leadership should be finished in one election_timeout + // so reset r.election_elapsed. self.election_elapsed = 0; self.lead_transferee = Some(lead_transferee); let pr = prs.get_mut(from).unwrap(); @@ -1531,7 +1536,7 @@ impl Raft { fatal!(self.logger, "stepped empty MsgProp"); } if !self.prs().voter_ids().contains(&self.id) { - // If we are not currently a member of the range (i.e. this node + // If we are not currently a voter (i.e. this node // was removed from the configuration while serving as leader), // drop any new proposals. return Err(Error::ProposalDropped); @@ -1578,7 +1583,7 @@ impl Raft { let mut self_set = HashSet::default(); self_set.insert(self.id); if !self.prs().has_quorum(&self_set, self.quorum_fn) { - // thinking: use an interally defined context instead of the user given context. + // thinking: use an internally defined context instead of the user given context. // We can express this in terms of the term and index instead of // a user-supplied value. // This would allow multiple reads to piggyback on the same message. @@ -1669,7 +1674,7 @@ impl Raft { Ok(()) } - /// Check if it can become leader. + /// Check if this raft can become leader. fn check_votes(&mut self) -> Option { match self.prs().candidacy_status(&self.votes, self.quorum_fn) { CandidacyStatus::Elected => { @@ -1893,23 +1898,19 @@ impl Raft { self.send_request_snapshot(); return; } + let mut to_send = Message::default(); + to_send.set_msg_type(MessageType::MsgAppendResponse); + to_send.to = m.from; if m.index < self.raft_log.committed { debug!( self.logger, "got message with lower index than committed."; ); - let mut to_send = Message::default(); - to_send.set_msg_type(MessageType::MsgAppendResponse); - to_send.to = m.from; to_send.index = self.raft_log.committed; self.send(to_send); return; } - let mut to_send = Message::default(); - to_send.to = m.from; - to_send.set_msg_type(MessageType::MsgAppendResponse); - if let Some((_, last_idx)) = self .raft_log .maybe_append(m.index, m.log_term, m.commit, &m.entries) @@ -1935,7 +1936,7 @@ impl Raft { } // TODO: revoke pub when there is a better way to test. - /// For a message, commit and send out heartbeat. + /// Handles MsgHeartbeat message. Update log commit and send out heartbeat response. pub fn handle_heartbeat(&mut self, mut m: Message) { self.raft_log.commit_to(m.commit); if self.pending_request_snapshot != INVALID_INDEX { @@ -2193,7 +2194,7 @@ impl Raft { } // TODO: revoke pub when there is a better way to test. - /// For a given hardstate, load the state into self. + /// Loads a given hardstate into self. pub fn load_state(&mut self, hs: &HardState) { if hs.commit < self.raft_log.committed || hs.commit > self.raft_log.last_index() { fatal!( @@ -2209,8 +2210,7 @@ impl Raft { self.vote = hs.vote; } - /// `pass_election_timeout` returns true iff `election_elapsed` is greater - /// than or equal to the randomized election timeout in + /// Returns whether `election_elapsed` has passed the randomized election timeout in /// [`election_timeout`, 2 * `election_timeout` - 1]. pub fn pass_election_timeout(&self) -> bool { self.election_elapsed >= self.randomized_election_timeout @@ -2231,18 +2231,17 @@ impl Raft { self.randomized_election_timeout = timeout; } - // check_quorum_active returns true if the quorum is active from - // the view of the local raft state machine. Otherwise, it returns - // false. + // Returns whether the quorum is active from the view of the local raft state machine. // check_quorum_active also resets all recent_active to false. - // check_quorum_active can only called by leader. + // check_quorum_active can only be called by leader. fn check_quorum_active(&mut self) -> bool { let self_id = self.id; let quorum_fn = self.quorum_fn; self.mut_prs().quorum_recently_active(self_id, quorum_fn) } - /// Issues a message to timeout immediately. + /// Issues a message to make a node timeout immediately. If the node is promotable, + /// it will start a campaign soon. pub fn send_timeout_now(&mut self, to: u64) { let msg = new_message(to, MessageType::MsgTimeoutNow, None); self.send(msg); diff --git a/src/raft_log.rs b/src/raft_log.rs index 83973cf1f..66a82d691 100644 --- a/src/raft_log.rs +++ b/src/raft_log.rs @@ -106,7 +106,7 @@ impl RaftLog { /// For a given index, finds the term associated with it. pub fn term(&self, idx: u64) -> Result { - // the valid term range is [index of dummy entry, last index] + // the valid index range is [index of dummy entry, last index] let dummy_idx = self.first_index() - 1; if idx < dummy_idx || idx > self.last_index() { return Ok(0u64); @@ -149,21 +149,17 @@ impl RaftLog { } } - /// Finds the index of the conflict. + /// Finds index of the conflict entry. /// - /// It returns the first index of conflicting entries between the existing - /// entries and the given entries, if there are any. + /// It returns the index of first conflicting entry between existing log + /// and the given entry list, if any. /// - /// If there are no conflicting entries, and the existing entries contain - /// all the given entries, zero will be returned. + /// When there is no conflict, it will return the index of first new entry. + /// If there's no new entry, zero will be returned. /// - /// If there are no conflicting entries, but the given entries contains new - /// entries, the index of the first new entry will be returned. + /// An entry is considered to be conflicting if it has the same index with + /// existing one but has a different term. /// - /// An entry is considered to be conflicting if it has the same index but - /// a different term. - /// - /// The first entry MUST have an index equal to the argument 'from'. /// The index of the given entries MUST be continuously increasing. pub fn find_conflict(&self, ents: &[Entry]) -> u64 { for e in ents { @@ -288,6 +284,7 @@ impl RaftLog { } /// Appends a set of entries to the unstable list. + /// Returns the last index after append. pub fn append(&mut self, ents: &[Entry]) -> u64 { trace!( self.unstable.logger, @@ -344,7 +341,7 @@ impl RaftLog { } } - /// Determines if the given (lastIndex,term) log is more up-to-date + /// Determines if the given (last_index, term) log is more up-to-date /// by comparing the index and term of the last entry in the existing logs. /// If the logs have last entry with different terms, then the log with the /// later term is more up-to-date. If the logs end with the same term, then @@ -374,18 +371,18 @@ impl RaftLog { self.next_entries_since(self.applied) } - /// Returns whether there are entries that can be applied between `since_idx` and the comitted index. + /// Returns whether there are entries that can be applied between `since_idx` and the committed index. pub fn has_next_entries_since(&self, since_idx: u64) -> bool { let offset = cmp::max(since_idx + 1, self.first_index()); self.committed + 1 > offset } - /// Returns whether there are new entries. + /// Returns whether there are unapplied entries. pub fn has_next_entries(&self) -> bool { self.has_next_entries_since(self.applied) } - /// Returns the current snapshot + /// Returns the current snapshot. pub fn snapshot(&self, request_index: u64) -> Result { if let Some(snap) = self.unstable.snapshot.as_ref() { if snap.get_metadata().index >= request_index { diff --git a/src/raw_node.rs b/src/raw_node.rs index e5c385178..204ba1e62 100644 --- a/src/raw_node.rs +++ b/src/raw_node.rs @@ -198,7 +198,7 @@ impl Ready { /// RawNode is a thread-unsafe Node. /// The methods of this struct correspond to the methods of Node and are described -/// more fully there. +/// in more detail there. pub struct RawNode { /// The internal raft state. pub raft: Raft, @@ -271,14 +271,14 @@ impl RawNode { self.raft.tick() } - /// Campaign causes this RawNode to transition to candidate state. + /// Campaign causes this RawNode to transit to candidate state. pub fn campaign(&mut self) -> Result<()> { let mut m = Message::default(); m.set_msg_type(MessageType::MsgHup); self.raft.step(m) } - /// Propose proposes data be appended to the raft log. + /// Proposes data to be appended to the raft log. pub fn propose(&mut self, context: Vec, data: Vec) -> Result<()> { let mut m = Message::default(); m.set_msg_type(MessageType::MsgPropose); @@ -292,7 +292,7 @@ impl RawNode { /// Broadcast heartbeats to all the followers. /// - /// If it's not leader, nothing will happen. + /// If current node is not leader, nothing will happen. pub fn ping(&mut self) { self.raft.ping() } @@ -351,12 +351,12 @@ impl RawNode { ) } - /// Ready returns the current point-in-time state of this RawNode. + /// Returns the current point-in-time state of this RawNode. pub fn ready(&mut self) -> Ready { Ready::new(&mut self.raft, &self.prev_ss, &self.prev_hs, None) } - /// Given an index, can determine if there is a ready state from that time. + /// Given an index, determines if a ready state exists since that time. pub fn has_ready_since(&self, applied_idx: Option) -> bool { let raft = &self.raft; if !raft.msgs.is_empty() || raft.raft_log.unstable_entries().is_some() { @@ -385,8 +385,7 @@ impl RawNode { false } - /// HasReady called when RawNode user need to check if any Ready pending. - /// Checking logic in this method should be consistent with Ready.containsUpdates(). + /// Checks if any Ready is pending. #[inline] pub fn has_ready(&self) -> bool { self.has_ready_since(None) @@ -398,7 +397,7 @@ impl RawNode { self.raft.snap() } - /// Advance notifies the RawNode that the application has applied and saved progress in the + /// Notifies the RawNode that the application has applied and saved progress in the /// last Ready results. pub fn advance(&mut self, rd: Ready) { self.advance_append(rd);