Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/kgo/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -696,7 +696,7 @@ func (b *broker) connect(ctx context.Context) (net.Conn, error) {
if !errors.Is(err, ErrClientClosed) && !errors.Is(err, context.Canceled) && !strings.Contains(err.Error(), "operation was canceled") {
if errors.Is(err, io.EOF) {
b.cl.cfg.logger.Log(LogLevelWarn, "unable to open connection to broker due to an immediate EOF, which often means the client is using TLS when the broker is not expecting it (is TLS misconfigured?)", "addr", b.addr, "broker", logID(b.meta.NodeID), "err", err)
return nil, &ErrFirstReadEOF{kind: firstReadDial, err: err}
return nil, &ErrFirstReadEOF{kind: firstReadDial, err: err, retry: b.cl.cfg.alwaysRetryEOF}
}
b.cl.cfg.logger.Log(LogLevelWarn, "unable to open connection to broker", "addr", b.addr, "broker", logID(b.meta.NodeID), "err", err)
}
Expand Down Expand Up @@ -807,7 +807,7 @@ start:
return &errApiVersionsReset{err}
} else if errors.Is(err, io.EOF) {
cxn.b.cl.cfg.logger.Log(LogLevelWarn, "read from broker received EOF during api versions discovery, which often happens when the broker requires TLS and the client is not using it (is TLS misconfigured?)", "addr", cxn.b.addr, "broker", logID(cxn.b.meta.NodeID), "err", err)
err = &ErrFirstReadEOF{kind: firstReadTLS, err: err}
err = &ErrFirstReadEOF{kind: firstReadTLS, err: err, retry: cxn.b.cl.cfg.alwaysRetryEOF}
}
return err
}
Expand Down Expand Up @@ -1553,7 +1553,7 @@ func (cxn *brokerCxn) handleResp(pr promisedResp) {
} else {
cxn.b.cl.cfg.logger.Log(LogLevelWarn, "read from broker errored, killing connection after 0 successful responses (is SASL missing?)", "req", kmsg.Key(pr.resp.Key()).Name(), "addr", cxn.b.addr, "broker", logID(cxn.b.meta.NodeID), "err", err)
if err == io.EOF { // specifically avoid checking errors.Is to ensure this is not already wrapped
err = &ErrFirstReadEOF{kind: firstReadSASL, err: err}
err = &ErrFirstReadEOF{kind: firstReadSASL, err: err, retry: cxn.b.cl.cfg.alwaysRetryEOF}
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/kgo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,8 @@ func (cl *Client) OptValues(opt any) []any {
return []any{cfg.disableClientMetrics}
case namefn(UserMetricsFn):
return []any{cfg.userMetrics}
case namefn(AlwaysRetryEOF):
return []any{cfg.alwaysRetryEOF}

case namefn(DefaultProduceTopic):
return []any{cfg.defaultProduceTopic}
Expand Down
20 changes: 20 additions & 0 deletions pkg/kgo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ type cfg struct {

sasls []sasl.Mechanism

alwaysRetryEOF bool
allowAutoTopicCreation bool
disableClientMetrics bool
userMetrics func() iter.Seq[Metric]
Expand Down Expand Up @@ -987,6 +988,25 @@ func UserMetricsFn(fn func() iter.Seq[Metric]) Opt {
return clientOpt{func(cfg *cfg) { cfg.userMetrics = fn }}
}

// AlwaysRetryEOF switches the client to *always* retry EOF.
//
// By default, if an EOF is experienced on the FIRST request being written to
// or read from a connection, the client does not retry on the error. EOFs are
// encountered for many reasons, and the client has no information available as
// to what exact reason the EOF was encountered. If your configuration is
// correct, then EOF is usually experienced during timeouts, or when you have
// some high load systems and connections are being cut for some reason, etc.
// If your configuration is incorrect (on the client or on the broker), EOF can
// be experienced due to some TLS settings mismatch or missing SASL
// credentials, and it's very hard to debug EOF in this case. Thus, after much
// feedback, the client was changed to assume an EOF experienced immediately
// means invalid configuration. If you *know* your configuration is correct,
// this option opts into always retrying EOF, allowing requests to retry and
// succeed as they normally should on very busy systems.
func AlwaysRetryEOF() Opt {
return clientOpt{func(cfg *cfg) { cfg.alwaysRetryEOF = true }}
}

////////////////////////////
// PRODUCER CONFIGURATION //
////////////////////////////
Expand Down
7 changes: 4 additions & 3 deletions pkg/kgo/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func isRetryableBrokerErr(err error) bool {
// If the FIRST read is EOF, that is usually not a good sign,
// often it's from bad SASL. We err on the side of pessimism
// and do not retry.
if ee := (*ErrFirstReadEOF)(nil); errors.As(err, &ee) {
if ee := (*ErrFirstReadEOF)(nil); errors.As(err, &ee) && !ee.retry {
return false
}
return true
Expand Down Expand Up @@ -238,8 +238,9 @@ var (
// the connection truly was severed before a response was received), but this
// error can help you quickly check common problems.
type ErrFirstReadEOF struct {
kind uint8
err error
kind uint8
err error
retry bool
}

type errProducerIDLoadFail struct {
Expand Down