Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
f292c73
metrics introduced
AnmolxSingh Apr 9, 2025
64d185b
lint fix
AnmolxSingh Apr 9, 2025
7e9e2ba
Merge branch 'main' into metrics
AnmolxSingh Apr 10, 2025
17756d8
lint checks fixed
AnmolxSingh Apr 10, 2025
519a5f5
Merge branch 'main' into metrics
AnmolxSingh Apr 11, 2025
e2dfb3c
suggestions done
AnmolxSingh Apr 11, 2025
5acecc4
indexcreate -> spanwrite
AnmolxSingh Apr 11, 2025
277cabb
Update internal/storage/elasticsearch/config/config.go
AnmolxSingh Apr 11, 2025
9ff5576
Merge branch 'main' into metrics
AnmolxSingh Apr 13, 2025
628f621
reduntant code fixed
AnmolxSingh Apr 13, 2025
8d7d850
Merge branch 'main' into metrics
AnmolxSingh Apr 15, 2025
06c2db4
update writer.go
AnmolxSingh Apr 15, 2025
e8e44b7
Merge branch 'main' into metrics
AnmolxSingh Apr 16, 2025
ff2cf86
test failure fixed
AnmolxSingh Apr 16, 2025
afc5dec
unit tests fixed
AnmolxSingh Apr 16, 2025
2f18f7b
fix tests
AnmolxSingh Apr 16, 2025
bd3eb21
fix tests
AnmolxSingh Apr 19, 2025
54e289f
merge conflicts
AnmolxSingh Apr 23, 2025
b97db50
removal of reduntant code
AnmolxSingh Apr 23, 2025
b893f2c
cleanup
yurishkuro Apr 27, 2025
392854f
Merge branch 'main' into metrics
yurishkuro Apr 27, 2025
a5b2ab4
fix
yurishkuro Apr 27, 2025
b768dfc
fmt
yurishkuro Apr 27, 2025
b7733ac
tests
AnmolxSingh Apr 30, 2025
1048941
fix tests
AnmolxSingh May 4, 2025
2adbcd7
fix
AnmolxSingh May 6, 2025
3d1c711
fix
AnmolxSingh May 6, 2025
e6b93d3
fix
AnmolxSingh May 6, 2025
39ddd02
fix
AnmolxSingh May 11, 2025
6b6d8ea
fix
AnmolxSingh May 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 22 additions & 2 deletions internal/storage/elasticsearch/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,9 +235,13 @@
After(func(id int64, requests []elastic.BulkableRequest, response *elastic.BulkResponse, err error) {
start, ok := m.Load(id)
if !ok {
return
m.Delete(id)

Check warning on line 238 in internal/storage/elasticsearch/config/config.go

View check run for this annotation

Codecov / codecov/patch

internal/storage/elasticsearch/config/config.go#L238

Added line #L238 was not covered by tests
} else {
start = time.Now()
}
m.Delete(id)

duration := time.Since(start.(time.Time))
var successCount, failureCount int

// log individual errors, note that err might be false and these errors still present
if response != nil && response.Errors {
Expand All @@ -246,11 +250,27 @@
if val.Error != nil {
logger.Error("Elasticsearch part of bulk request failed", zap.String("map-key", key),
zap.Reflect("response", val))
} else {
successCount++

Check warning on line 254 in internal/storage/elasticsearch/config/config.go

View check run for this annotation

Codecov / codecov/patch

internal/storage/elasticsearch/config/config.go#L253-L254

Added lines #L253 - L254 were not covered by tests
}
}
}
} else if response != nil {
// All succeeded
successCount = len(response.Items)
}

sm.Inserts.Inc(int64(successCount))
sm.Errors.Inc(int64(failureCount))

if err != nil || failureCount > 0 {
sm.LatencyErr.Record(duration)

Check warning on line 267 in internal/storage/elasticsearch/config/config.go

View check run for this annotation

Codecov / codecov/patch

internal/storage/elasticsearch/config/config.go#L267

Added line #L267 was not covered by tests
} else {
sm.LatencyOk.Record(duration)
}

sm.Emit(err, duration)

sm.Emit(err, time.Since(start.(time.Time)))
if err != nil {
var failed int
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 17 additions & 4 deletions internal/storage/v1/elasticsearch/spanstore/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
es "github.com/jaegertracing/jaeger/internal/storage/elasticsearch"
cfg "github.com/jaegertracing/jaeger/internal/storage/elasticsearch/config"
"github.com/jaegertracing/jaeger/internal/storage/elasticsearch/dbmodel"
"github.com/jaegertracing/jaeger/internal/storage/v1/api/spanstore/spanstoremetrics"
)

const (
Expand All @@ -25,13 +26,18 @@ const (
indexCacheTTLDefault = 48 * time.Hour
)

type spanWriterMetrics struct {
indexCreate *spanstoremetrics.WriteMetrics
}

type serviceWriter func(string, *dbmodel.Span)

// SpanWriter is a wrapper around elastic.Client
type SpanWriter struct {
client func() es.Client
logger *zap.Logger
// indexCache cache.Cache
writerMetrics spanWriterMetrics
serviceWriter serviceWriter
spanServiceIndex spanAndServiceIndexFn
}
Expand All @@ -41,7 +47,7 @@ type CoreSpanWriter interface {
// CreateTemplates creates index templates.
CreateTemplates(spanTemplate, serviceTemplate string, indexPrefix cfg.IndexPrefix) error
// WriteSpan writes a span and its corresponding service:operation in ElasticSearch
WriteSpan(spanStartTime time.Time, span *dbmodel.Span)
WriteSpan(spanStartTime time.Time, span *dbmodel.Span) error
// Close closes CoreSpanWriter
Close() error
}
Expand Down Expand Up @@ -80,8 +86,11 @@ func NewSpanWriter(p SpanWriterParams) *SpanWriter {

serviceOperationStorage := NewServiceOperationStorage(p.Client, p.Logger, serviceCacheTTL)
return &SpanWriter{
client: p.Client,
logger: p.Logger,
client: p.Client,
logger: p.Logger,
writerMetrics: spanWriterMetrics{
indexCreate: spanstoremetrics.NewWriter(p.MetricsFactory, "index_create"),
},
serviceWriter: serviceOperationStorage.Write,
spanServiceIndex: getSpanAndServiceIndexFn(p, writeAliasSuffix),
}
Expand Down Expand Up @@ -119,13 +128,17 @@ func getSpanAndServiceIndexFn(p SpanWriterParams, writeAlias string) spanAndServ
}

// WriteSpan writes a span and its corresponding service:operation in ElasticSearch
func (s *SpanWriter) WriteSpan(spanStartTime time.Time, span *dbmodel.Span) {
func (s *SpanWriter) WriteSpan(spanStartTime time.Time, span *dbmodel.Span) error {
s.writerMetrics.indexCreate.Attempts.Inc(1)

spanIndexName, serviceIndexName := s.spanServiceIndex(spanStartTime)
if serviceIndexName != "" {
s.writeService(serviceIndexName, span)
}
s.writeSpan(spanIndexName, span)
s.logger.Debug("Wrote span to ES index", zap.String("index", spanIndexName))

return nil
}

// Close closes SpanWriter
Expand Down
3 changes: 1 addition & 2 deletions internal/storage/v1/elasticsearch/spanstore/writerv1.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ func (s *SpanWriterV1) CreateTemplates(spanTemplate, serviceTemplate string, ind
// WriteSpan writes a span and its corresponding service:operation in ElasticSearch
func (s *SpanWriterV1) WriteSpan(_ context.Context, span *model.Span) error {
jsonSpan := s.spanConverter.FromDomainEmbedProcess(span)
s.spanWriter.WriteSpan(span.StartTime, jsonSpan)
return nil
return s.spanWriter.WriteSpan(span.StartTime, jsonSpan)
}

// Close closes SpanWriter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func TestSpanWriterV1_WriteSpan(t *testing.T) {
}
converter := NewFromDomain(true, []string{}, "-")
writerV1 := &SpanWriterV1{spanWriter: coreWriter, spanConverter: converter}
coreWriter.On("WriteSpan", s.StartTime, converter.FromDomainEmbedProcess(s))
coreWriter.On("WriteSpan", s.StartTime, converter.FromDomainEmbedProcess(s)).Return(nil)
err := writerV1.WriteSpan(context.Background(), s)
require.NoError(t, err)
}
Expand Down
Loading