Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#8202](https://github.com/thanos-io/thanos/pull/8202) Receive: Unhide `--tsdb.enable-native-histograms` flag
- [#8225](https://github.com/thanos-io/thanos/pull/8225) tools: Extend bucket ls options.
- [#8282](https://github.com/thanos-io/thanos/pull/8282) Force sync writes to meta.json in case of host crash
- [#8308](https://github.com/thanos-io/thanos/pull/8308) Receive: Prometheus counters for pending write requests and series requests

### Added

Expand Down
2 changes: 1 addition & 1 deletion docs/components/receive.md
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ Please see the metric `thanos_receive_forward_delay_seconds` to see if you need

The following formula is used for calculating quorum:

```go mdox-exec="sed -n '1036,1046p' pkg/receive/handler.go"
```go mdox-exec="sed -n '1046,1056p' pkg/receive/handler.go"
// writeQuorum returns minimum number of replicas that has to confirm write success before claiming replication success.
func (h *Handler) writeQuorum() int {
// NOTE(GiedriusS): this is here because otherwise RF=2 doesn't make sense as all writes
Expand Down
13 changes: 13 additions & 0 deletions pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/prometheus/prometheus/tsdb"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/atomic"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -141,6 +142,9 @@ type Handler struct {
writeSamplesTotal *prometheus.HistogramVec
writeTimeseriesTotal *prometheus.HistogramVec

pendingWriteRequests prometheus.Gauge
pendingWriteRequestsCounter atomic.Int32

Limiter *Limiter
}

Expand Down Expand Up @@ -222,6 +226,12 @@ func NewHandler(logger log.Logger, o *Options) *Handler {
Buckets: []float64{10, 50, 100, 500, 1000, 5000, 10000},
}, []string{"code", "tenant"},
),
pendingWriteRequests: promauto.With(registerer).NewGauge(
prometheus.GaugeOpts{
Name: "thanos_receive_pending_write_requests",
Help: "The number of pending write requests.",
},
),
}

h.forwardRequests.WithLabelValues(labelSuccess)
Expand Down Expand Up @@ -1060,6 +1070,9 @@ func (h *Handler) RemoteWrite(ctx context.Context, r *storepb.WriteRequest) (*st
span, ctx := tracing.StartSpan(ctx, "receive_grpc")
defer span.Finish()

h.pendingWriteRequests.Set(float64(h.pendingWriteRequestsCounter.Add(1)))
defer h.pendingWriteRequestsCounter.Add(-1)

_, err := h.handleRequest(ctx, uint64(r.Replica), r.Tenant, &prompb.WriteRequest{Timeseries: r.Timeseries})
if err != nil {
level.Debug(h.logger).Log("msg", "failed to handle request", "err", err)
Expand Down
12 changes: 12 additions & 0 deletions pkg/store/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/thanos-io/thanos/pkg/store/storepb"

"go.uber.org/atomic"
)

// seriesStatsAggregator aggregates results from fanned-out queries into a histogram given their
Expand Down Expand Up @@ -145,6 +147,9 @@ type instrumentedStoreServer struct {
storepb.StoreServer
seriesRequested prometheus.Histogram
chunksRequested prometheus.Histogram

pendingRequests prometheus.Gauge
pendingRequestsCounter atomic.Int32
}

// NewInstrumentedStoreServer creates a new instrumentedStoreServer.
Expand All @@ -161,11 +166,18 @@ func NewInstrumentedStoreServer(reg prometheus.Registerer, store storepb.StoreSe
Help: "Number of requested chunks for Series calls",
Buckets: []float64{1, 100, 1000, 10000, 100000, 10000000, 100000000, 1000000000},
}),
pendingRequests: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "thanos_store_server_pending_series_requests",
Help: "Number of pending series requests",
}),
}
}

func (s *instrumentedStoreServer) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error {
instrumented := newInstrumentedServer(srv)
s.pendingRequests.Set(float64(s.pendingRequestsCounter.Add(1)))
defer s.pendingRequestsCounter.Add(-1)

if err := s.StoreServer.Series(req, instrumented); err != nil {
return err
}
Expand Down
Loading