Skip to content

Commit e9a5b41

Browse files
Only meta/stream leader responds to consumer list
Signed-off-by: Maurice van Veen <[email protected]>
1 parent 7f870cc commit e9a5b41

File tree

1 file changed

+26
-0
lines changed

1 file changed

+26
-0
lines changed

server/jetstream_api.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4650,6 +4650,32 @@ func (s *Server) jsConsumerListRequest(sub *subscription, c *client, _ *Account,
46504650

46514651
// Clustered mode will invoke a scatter and gather.
46524652
if s.JetStreamIsClustered() {
4653+
// Determine if we should proceed here when we are in clustered mode.
4654+
js, cc := s.getJetStreamCluster()
4655+
if js == nil || cc == nil {
4656+
return
4657+
}
4658+
if js.isLeaderless() {
4659+
resp.Error = NewJSClusterNotAvailError()
4660+
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
4661+
return
4662+
}
4663+
isLeader := s.JetStreamIsLeader()
4664+
js.mu.RLock()
4665+
sa := js.streamAssignment(acc.Name, streamName)
4666+
if sa != nil && sa.Config != nil && sa.Config.ManagesConsumers {
4667+
// If the stream manages its own consumers then we care about the stream
4668+
// leader at this point, not the metaleader, as the metaleader will not
4669+
// know or not whether the consumer is meant to exist.
4670+
isLeader = cc.isStreamLeader(acc.Name, streamName)
4671+
}
4672+
js.mu.RUnlock()
4673+
4674+
// Make sure we are leader.
4675+
if !isLeader {
4676+
return
4677+
}
4678+
46534679
// Need to copy these off before sending.. don't move this inside startGoRoutine!!!
46544680
msg = copyBytes(msg)
46554681
s.startGoRoutine(func() {

0 commit comments

Comments
 (0)