Skip to content

Commit baaf5f7

Browse files
authored
Push EMF logs in batch (#2572)
1 parent 0358399 commit baaf5f7

File tree

1 file changed

+45
-21
lines changed

1 file changed

+45
-21
lines changed

exporter/awsemfexporter/emf_exporter.go

Lines changed: 45 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,6 @@ func NewEmfExporter(
8989
config configmodels.Exporter,
9090
params component.ExporterCreateParams,
9191
) (component.MetricsExporter, error) {
92-
9392
exp, err := New(config, params)
9493
if err != nil {
9594
return nil, err
@@ -105,12 +104,23 @@ func NewEmfExporter(
105104
}
106105

107106
func (emf *emfExporter) pushMetricsData(_ context.Context, md pdata.Metrics) (droppedTimeSeries int, err error) {
107+
rms := md.ResourceMetrics()
108+
labels := map[string]string{}
109+
for i := 0; i < rms.Len(); i++ {
110+
rm := rms.At(i)
111+
am := rm.Resource().Attributes()
112+
if am.Len() > 0 {
113+
am.ForEach(func(k string, v pdata.AttributeValue) {
114+
labels[k] = v.StringVal()
115+
})
116+
}
117+
}
118+
emf.logger.Info("Start processing resource metrics", zap.Any("labels", labels))
119+
108120
groupedMetrics := make(map[interface{}]*GroupedMetric)
109121
expConfig := emf.config.(*Config)
110122
defaultLogStream := fmt.Sprintf("otel-stream-%s", emf.collectorID)
111123

112-
rms := md.ResourceMetrics()
113-
114124
for i := 0; i < rms.Len(); i++ {
115125
rm := rms.At(i)
116126
emf.metricTranslator.translateOTelToGroupedMetric(&rm, groupedMetrics, expConfig)
@@ -133,14 +143,23 @@ func (emf *emfExporter) pushMetricsData(_ context.Context, md pdata.Metrics) (dr
133143
err = wrapErrorIfBadRequest(&returnError)
134144
return
135145
}
136-
returnError = pusher.ForceFlush()
137-
if returnError != nil {
138-
err = wrapErrorIfBadRequest(&returnError)
139-
return
146+
}
147+
}
148+
149+
for _, pusher := range emf.listPushers() {
150+
returnError := pusher.ForceFlush()
151+
if returnError != nil {
152+
//TODO now we only have one pusher, so it's ok to return after first error occurred
153+
err = wrapErrorIfBadRequest(&returnError)
154+
if err != nil {
155+
emf.logger.Error("Error force flushing logs. Skipping to next pusher.", zap.Error(err))
140156
}
157+
return
141158
}
142159
}
143160

161+
emf.logger.Info("Finish processing resource metrics", zap.Any("labels", labels))
162+
144163
return
145164
}
146165

@@ -163,6 +182,19 @@ func (emf *emfExporter) getPusher(logGroup, logStream string) Pusher {
163182
return pusher
164183
}
165184

185+
func (emf *emfExporter) listPushers() []Pusher {
186+
emf.pusherMapLock.Lock()
187+
defer emf.pusherMapLock.Unlock()
188+
189+
pushers := []Pusher{}
190+
for _, pusherMap := range emf.groupStreamToPusherMap {
191+
for _, pusher := range pusherMap {
192+
pushers = append(pushers, pusher)
193+
}
194+
}
195+
return pushers
196+
}
197+
166198
func (emf *emfExporter) ConsumeMetrics(ctx context.Context, md pdata.Metrics) error {
167199
exporterCtx := obsreport.ExporterContext(ctx, "emf.exporterFullName")
168200

@@ -172,20 +204,12 @@ func (emf *emfExporter) ConsumeMetrics(ctx context.Context, md pdata.Metrics) er
172204

173205
// Shutdown stops the exporter and is invoked during shutdown.
174206
func (emf *emfExporter) Shutdown(ctx context.Context) error {
175-
emf.pusherMapLock.Lock()
176-
defer emf.pusherMapLock.Unlock()
177-
178-
var err error
179-
for _, streamToPusherMap := range emf.groupStreamToPusherMap {
180-
for _, pusher := range streamToPusherMap {
181-
if pusher != nil {
182-
returnError := pusher.ForceFlush()
183-
if returnError != nil {
184-
err = wrapErrorIfBadRequest(&returnError)
185-
}
186-
if err != nil {
187-
emf.logger.Error("Error when gracefully shutting down emf_exporter. Skipping to next pusher.", zap.Error(err))
188-
}
207+
for _, pusher := range emf.listPushers() {
208+
returnError := pusher.ForceFlush()
209+
if returnError != nil {
210+
err := wrapErrorIfBadRequest(&returnError)
211+
if err != nil {
212+
emf.logger.Error("Error when gracefully shutting down emf_exporter. Skipping to next pusher.", zap.Error(err))
189213
}
190214
}
191215
}

0 commit comments

Comments
 (0)