Skip to content

Commit a00b4c9

Browse files
authored
[receiver/libhoney] Fix msgpack response object (#40414)
1 parent 6b9e959 commit a00b4c9

File tree

7 files changed

+117
-28
lines changed

7 files changed

+117
-28
lines changed
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
change_type: bug_fix
2+
component: libhoneyreceiver
3+
note: Fix response encoding for msgpack
4+
issues: [40413]
5+
change_logs: [user]
6+
subtext: |
7+
The libhoneyreceiver now correctly encodes the response for msgpack.
8+
This fixes the issue where the response was not being encoded correctly.

receiver/libhoneyreceiver/encoder/encoder.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import (
1616
const (
1717
PbContentType = "application/x-protobuf"
1818
JSONContentType = "application/json"
19-
MsgpackContentType = "application/x-msgpack"
19+
MsgpackContentType = "application/msgpack"
2020
)
2121

2222
var (

receiver/libhoneyreceiver/internal/libhoneyevent/libhoneyevent.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -104,18 +104,18 @@ func (l *LibhoneyEvent) SignalType(logger zap.Logger) string {
104104
case "link":
105105
return "span_link"
106106
}
107-
logger.Warn("invalid annotation type", zap.String("meta.annotation_type", atype.(string)))
107+
logger.Debug("invalid annotation type", zap.String("meta.annotation_type", atype.(string)))
108108
return "span"
109109
}
110110
return "span"
111111
case "log":
112112
return "log"
113113
default:
114-
logger.Warn("invalid meta.signal_type", zap.String("meta.signal_type", sig.(string)))
114+
logger.Debug("invalid meta.signal_type", zap.String("meta.signal_type", sig.(string)))
115115
return "log"
116116
}
117117
}
118-
logger.Warn("missing meta.signal_type and meta.annotation_type")
118+
logger.Debug("missing meta.signal_type and meta.annotation_type")
119119
return "log"
120120
}
121121

@@ -251,7 +251,7 @@ func (l *LibhoneyEvent) ToPLogRecord(newLog *plog.LogRecord, alreadyUsedFields *
251251
case bool:
252252
newLog.Attributes().PutBool(k, v)
253253
default:
254-
logger.Warn("Span data type issue", zap.Int64("timestamp", timeNs), zap.String("key", k))
254+
logger.Debug("Span data type issue", zap.Int64("timestamp", timeNs), zap.String("key", k))
255255
}
256256
}
257257
return nil
@@ -378,7 +378,7 @@ func (l *LibhoneyEvent) ToPTraceSpan(newSpan *ptrace.Span, alreadyUsedFields *[]
378378
case bool:
379379
newSpan.Attributes().PutBool(k, v)
380380
default:
381-
logger.Warn("Span data type issue", zap.String("trace.trace_id", newSpan.TraceID().String()), zap.String("trace.span_id", newSpan.SpanID().String()), zap.String("key", k))
381+
logger.Debug("Span data type issue", zap.String("trace.trace_id", newSpan.TraceID().String()), zap.String("trace.span_id", newSpan.SpanID().String()), zap.String("key", k))
382382
}
383383
}
384384
return nil

receiver/libhoneyreceiver/internal/parser/parser.go

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -52,16 +52,16 @@ func ToPdata(dataset string, lhes []libhoneyevent.LibhoneyEvent, cfg libhoneyeve
5252
} // seed a default
5353

5454
alreadyUsedFields := []string{cfg.Resources.ServiceName, cfg.Scopes.LibraryName, cfg.Scopes.LibraryVersion}
55-
alreadyUsedFields = append(alreadyUsedFields, cfg.Attributes.Name,
55+
alreadyUsedTraceFields := []string{
56+
cfg.Attributes.Name,
5657
cfg.Attributes.TraceID, cfg.Attributes.ParentID, cfg.Attributes.SpanID,
5758
cfg.Attributes.Error, cfg.Attributes.SpanKind,
58-
)
59-
alreadyUsedFields = append(alreadyUsedFields, cfg.Attributes.DurationFields...)
59+
}
6060

6161
for _, lhe := range lhes {
6262
parentID, err := lhe.GetParentID(cfg.Attributes.ParentID)
6363
if err != nil {
64-
logger.Warn("parent id not found")
64+
logger.Debug("parent id not found")
6565
}
6666

6767
action := lhe.SignalType(logger)
@@ -70,6 +70,8 @@ func ToPdata(dataset string, lhes []libhoneyevent.LibhoneyEvent, cfg libhoneyeve
7070
spanService, _ := lhe.GetService(cfg, &foundServices, dataset)
7171
spanScopeKey, _ := lhe.GetScope(cfg, &foundScopes, spanService) // adds a new found scope if needed
7272
newSpan := foundScopes.Scope[spanScopeKey].ScopeSpans.AppendEmpty()
73+
alreadyUsedFields = append(alreadyUsedFields, alreadyUsedTraceFields...)
74+
alreadyUsedFields = append(alreadyUsedFields, cfg.Attributes.DurationFields...)
7375
err := lhe.ToPTraceSpan(&newSpan, &alreadyUsedFields, cfg, logger)
7476
if err != nil {
7577
logger.Warn("span could not be converted from libhoney to ptrace", zap.String("span.object", lhe.DebugString()))
@@ -160,7 +162,7 @@ func addSpanEventsToSpan(sp ptrace.Span, events []libhoneyevent.LibhoneyEvent, a
160162
case bool:
161163
newEvent.Attributes().PutBool(lkey, lval)
162164
default:
163-
logger.Warn("SpanEvent data type issue",
165+
logger.Debug("SpanEvent data type issue",
164166
zap.String("trace.trace_id", sp.TraceID().String()),
165167
zap.String("trace.span_id", sp.SpanID().String()),
166168
zap.String("key", lkey))
@@ -176,7 +178,7 @@ func addSpanLinksToSpan(sp ptrace.Span, links []libhoneyevent.LibhoneyEvent, alr
176178
if linkTraceStr, ok := spl.Data["trace.link.trace_id"]; ok {
177179
tidByteArray, err := hex.DecodeString(linkTraceStr.(string))
178180
if err != nil {
179-
logger.Warn("span link invalid",
181+
logger.Debug("span link invalid",
180182
zap.String("missing.attribute", "trace.link.trace_id"),
181183
zap.String("span link contents", spl.DebugString()))
182184
continue
@@ -186,7 +188,7 @@ func addSpanLinksToSpan(sp ptrace.Span, links []libhoneyevent.LibhoneyEvent, alr
186188
}
187189
newLink.SetTraceID(pcommon.TraceID(tidByteArray))
188190
} else {
189-
logger.Warn("span link missing attributes",
191+
logger.Debug("span link missing attributes",
190192
zap.String("missing.attribute", "trace.link.trace_id"),
191193
zap.String("span link contents", spl.DebugString()))
192194
continue
@@ -195,7 +197,7 @@ func addSpanLinksToSpan(sp ptrace.Span, links []libhoneyevent.LibhoneyEvent, alr
195197
if linkSpanStr, ok := spl.Data["trace.link.span_id"]; ok {
196198
sidByteArray, err := hex.DecodeString(linkSpanStr.(string))
197199
if err != nil {
198-
logger.Warn("span link invalid",
200+
logger.Debug("span link invalid",
199201
zap.String("missing.attribute", "trace.link.span_id"),
200202
zap.String("span link contents", spl.DebugString()))
201203
continue
@@ -205,7 +207,7 @@ func addSpanLinksToSpan(sp ptrace.Span, links []libhoneyevent.LibhoneyEvent, alr
205207
}
206208
newLink.SetSpanID(pcommon.SpanID(sidByteArray))
207209
} else {
208-
logger.Warn("span link missing attributes",
210+
logger.Debug("span link missing attributes",
209211
zap.String("missing.attribute", "trace.link.span_id"),
210212
zap.String("span link contents", spl.DebugString()))
211213
continue
@@ -234,7 +236,7 @@ func addSpanLinksToSpan(sp ptrace.Span, links []libhoneyevent.LibhoneyEvent, alr
234236
case bool:
235237
newLink.Attributes().PutBool(lkey, lval)
236238
default:
237-
logger.Warn("SpanLink data type issue",
239+
logger.Debug("SpanLink data type issue",
238240
zap.String("trace.trace_id", sp.TraceID().String()),
239241
zap.String("trace.span_id", sp.SpanID().String()),
240242
zap.String("key", lkey))
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package response // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/internal/response"
5+
6+
import "net/http"
7+
8+
type ResponseInBatch struct {
9+
ErrorStr string `json:"error,omitempty"`
10+
Status int `json:"status,omitempty"`
11+
}
12+
13+
func MakeResponse(eventErrs []int) []ResponseInBatch {
14+
if len(eventErrs) == 0 {
15+
return []ResponseInBatch{{Status: http.StatusAccepted}}
16+
}
17+
18+
responses := make([]ResponseInBatch, len(eventErrs))
19+
for i, eventErr := range eventErrs {
20+
responses[i] = ResponseInBatch{
21+
ErrorStr: "error",
22+
Status: eventErr,
23+
}
24+
}
25+
return responses
26+
}

receiver/libhoneyreceiver/receiver.go

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"net/http"
1616
"strings"
1717
"sync"
18+
"time"
1819

1920
"github.com/vmihailenco/msgpack/v5"
2021
"go.opentelemetry.io/collector/component"
@@ -26,8 +27,10 @@ import (
2627

2728
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/errorutil"
2829
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/encoder"
30+
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/internal/eventtime"
2931
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/internal/libhoneyevent"
3032
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/internal/parser"
33+
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/internal/response"
3134
)
3235

3336
type libhoneyReceiver struct {
@@ -226,6 +229,21 @@ func (r *libhoneyReceiver) handleEvent(resp http.ResponseWriter, req *http.Reque
226229
if err != nil {
227230
r.settings.Logger.Info("messagepack decoding failed")
228231
}
232+
// Post-process msgpack events to ensure timestamps are set
233+
for i := range libhoneyevents {
234+
if libhoneyevents[i].MsgPackTimestamp == nil {
235+
if libhoneyevents[i].Time != "" {
236+
// Parse the time string and set MsgPackTimestamp
237+
propertime := eventtime.GetEventTime(libhoneyevents[i].Time)
238+
libhoneyevents[i].MsgPackTimestamp = &propertime
239+
} else {
240+
// No time field, use current time
241+
tnow := time.Now()
242+
libhoneyevents[i].MsgPackTimestamp = &tnow
243+
libhoneyevents[i].Time = eventtime.GetEventTimeDefaultString()
244+
}
245+
}
246+
}
229247
if len(libhoneyevents) > 0 {
230248
r.settings.Logger.Debug("Decoding with msgpack worked", zap.Time("timestamp.first.msgpacktimestamp", *libhoneyevents[0].MsgPackTimestamp), zap.String("timestamp.first.time", libhoneyevents[0].Time))
231249
r.settings.Logger.Debug("event zero", zap.String("event.data", libhoneyevents[0].DebugString()))
@@ -238,6 +256,8 @@ func (r *libhoneyReceiver) handleEvent(resp http.ResponseWriter, req *http.Reque
238256
if len(libhoneyevents) > 0 {
239257
r.settings.Logger.Debug("Decoding with json worked", zap.Time("timestamp.first.msgpacktimestamp", *libhoneyevents[0].MsgPackTimestamp), zap.String("timestamp.first.time", libhoneyevents[0].Time))
240258
}
259+
default:
260+
r.settings.Logger.Info("unsupported content type", zap.String("content-type", req.Header.Get("Content-Type")))
241261
}
242262

243263
otlpLogs, otlpTraces := parser.ToPdata(dataset, libhoneyevents, r.cfg.FieldMapConfig, *r.settings.Logger)
@@ -261,8 +281,28 @@ func (r *libhoneyReceiver) handleEvent(resp http.ResponseWriter, req *http.Reque
261281
return
262282
}
263283

264-
noErrors := []byte(`{"errors":[]}`)
265-
writeResponse(resp, enc.ContentType(), http.StatusOK, noErrors)
284+
// return clean response if no errors above
285+
noErrors := response.MakeResponse([]int{})
286+
287+
var responseBody []byte
288+
var contentType string
289+
290+
switch enc.ContentType() {
291+
case encoder.MsgpackContentType:
292+
// For msgpack requests, return msgpack response
293+
responseBody, err = msgpack.Marshal(noErrors)
294+
contentType = encoder.MsgpackContentType
295+
default:
296+
// For JSON requests, return JSON response
297+
responseBody, err = json.Marshal(noErrors)
298+
contentType = encoder.JSONContentType
299+
}
300+
301+
if err != nil {
302+
errorutil.HTTPError(resp, err)
303+
return
304+
}
305+
writeResponse(resp, contentType, http.StatusOK, responseBody)
266306
}
267307

268308
func readContentType(resp http.ResponseWriter, req *http.Request) (encoder.Encoder, bool) {

receiver/libhoneyreceiver/receiver_test.go

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222

2323
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/internal/libhoneyevent"
2424
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/internal/metadata"
25+
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/internal/response"
2526
)
2627

2728
func TestNewLibhoneyReceiver(t *testing.T) {
@@ -89,11 +90,12 @@ func TestLibhoneyReceiver_Start(t *testing.T) {
8990
func TestLibhoneyReceiver_HandleEvent(t *testing.T) {
9091
now := time.Now()
9192
tests := []struct {
92-
name string
93-
events []libhoneyevent.LibhoneyEvent
94-
contentType string
95-
expectedStatus int
96-
wantError bool
93+
name string
94+
events []libhoneyevent.LibhoneyEvent
95+
contentType string
96+
expectedStatus int
97+
expectedResponse []response.ResponseInBatch
98+
wantError bool
9799
}{
98100
{
99101
name: "valid_json_event",
@@ -109,6 +111,9 @@ func TestLibhoneyReceiver_HandleEvent(t *testing.T) {
109111
},
110112
contentType: "application/json",
111113
expectedStatus: http.StatusOK,
114+
expectedResponse: []response.ResponseInBatch{
115+
{Status: http.StatusAccepted},
116+
},
112117
},
113118
{
114119
name: "valid_msgpack_event",
@@ -126,11 +131,12 @@ func TestLibhoneyReceiver_HandleEvent(t *testing.T) {
126131
expectedStatus: http.StatusOK,
127132
},
128133
{
129-
name: "invalid_content_type",
130-
events: []libhoneyevent.LibhoneyEvent{},
131-
contentType: "text/plain",
132-
expectedStatus: http.StatusUnsupportedMediaType,
133-
wantError: true,
134+
name: "invalid_content_type",
135+
events: []libhoneyevent.LibhoneyEvent{},
136+
contentType: "text/plain",
137+
expectedStatus: http.StatusUnsupportedMediaType,
138+
wantError: true,
139+
expectedResponse: nil,
134140
},
135141
}
136142

@@ -168,6 +174,13 @@ func TestLibhoneyReceiver_HandleEvent(t *testing.T) {
168174
assert.Eventually(t, func() bool {
169175
return sink.LogRecordCount() > 0
170176
}, time.Second, 10*time.Millisecond)
177+
assert.Equal(t, tt.contentType, resp.Header.Get("Content-Type"))
178+
}
179+
if tt.expectedResponse != nil {
180+
var actualResponse []response.ResponseInBatch
181+
err = json.NewDecoder(resp.Body).Decode(&actualResponse)
182+
assert.NoError(t, err)
183+
assert.Equal(t, tt.expectedResponse, actualResponse)
171184
}
172185
})
173186
}

0 commit comments

Comments
 (0)