|
4 | 4 | package libhoneyevent // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/internal/libhoneyevent"
|
5 | 5 |
|
6 | 6 | import (
|
| 7 | + "crypto/rand" |
| 8 | + "encoding/binary" |
| 9 | + "encoding/hex" |
7 | 10 | "encoding/json"
|
8 | 11 | "errors"
|
9 | 12 | "fmt"
|
| 13 | + "hash/fnv" |
10 | 14 | "slices"
|
| 15 | + "strings" |
11 | 16 | "time"
|
12 | 17 |
|
13 | 18 | "go.opentelemetry.io/collector/pdata/pcommon"
|
14 | 19 | "go.opentelemetry.io/collector/pdata/plog"
|
15 | 20 | "go.opentelemetry.io/collector/pdata/ptrace"
|
| 21 | + trc "go.opentelemetry.io/otel/trace" |
16 | 22 | "go.uber.org/zap"
|
17 | 23 |
|
18 | 24 | "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/libhoneyreceiver/internal/eventtime"
|
@@ -87,8 +93,29 @@ func (l *LibhoneyEvent) DebugString() string {
|
87 | 93 | }
|
88 | 94 |
|
89 | 95 | // SignalType returns the type of signal this event represents. Only log is implemented for now.
|
90 |
| -func (l *LibhoneyEvent) SignalType() (string, error) { |
91 |
| - return "log", nil |
| 96 | +func (l *LibhoneyEvent) SignalType(logger zap.Logger) string { |
| 97 | + if sig, ok := l.Data["meta.signal_type"]; ok { |
| 98 | + switch sig { |
| 99 | + case "trace": |
| 100 | + if atype, ok := l.Data["meta.annotation_type"]; ok { |
| 101 | + if atype == "span_event" { |
| 102 | + return "span_event" |
| 103 | + } else if atype == "link" { |
| 104 | + return "span_link" |
| 105 | + } |
| 106 | + logger.Warn("invalid annotation type", zap.String("meta.annotation_type", atype.(string))) |
| 107 | + return "span" |
| 108 | + } |
| 109 | + return "span" |
| 110 | + case "log": |
| 111 | + return "log" |
| 112 | + default: |
| 113 | + logger.Warn("invalid meta.signal_type", zap.String("meta.signal_type", sig.(string))) |
| 114 | + return "log" |
| 115 | + } |
| 116 | + } |
| 117 | + logger.Warn("missing meta.signal_type and meta.annotation_type") |
| 118 | + return "log" |
92 | 119 | }
|
93 | 120 |
|
94 | 121 | // GetService returns the service name from the event or the dataset name if no service name is found.
|
@@ -126,6 +153,36 @@ func (l *LibhoneyEvent) GetScope(fields FieldMapConfig, seen *ScopeHistory, serv
|
126 | 153 | return "libhoney.receiver", errors.New("library name not found")
|
127 | 154 | }
|
128 | 155 |
|
| 156 | +func spanIDFrom(s string) trc.SpanID { |
| 157 | + hash := fnv.New64a() |
| 158 | + hash.Write([]byte(s)) |
| 159 | + n := hash.Sum64() |
| 160 | + sid := trc.SpanID{} |
| 161 | + binary.LittleEndian.PutUint64(sid[:], n) |
| 162 | + return sid |
| 163 | +} |
| 164 | + |
| 165 | +func traceIDFrom(s string) trc.TraceID { |
| 166 | + hash := fnv.New64a() |
| 167 | + hash.Write([]byte(s)) |
| 168 | + n1 := hash.Sum64() |
| 169 | + hash.Write([]byte(s)) |
| 170 | + n2 := hash.Sum64() |
| 171 | + tid := trc.TraceID{} |
| 172 | + binary.LittleEndian.PutUint64(tid[:], n1) |
| 173 | + binary.LittleEndian.PutUint64(tid[8:], n2) |
| 174 | + return tid |
| 175 | +} |
| 176 | + |
| 177 | +func generateAnId(length int) []byte { |
| 178 | + token := make([]byte, length) |
| 179 | + _, err := rand.Read(token) |
| 180 | + if err != nil { |
| 181 | + return []byte{} |
| 182 | + } |
| 183 | + return token |
| 184 | +} |
| 185 | + |
129 | 186 | // SimpleScope is a simple struct to hold the scope data
|
130 | 187 | type SimpleScope struct {
|
131 | 188 | ServiceName string
|
@@ -198,3 +255,130 @@ func (l *LibhoneyEvent) ToPLogRecord(newLog *plog.LogRecord, alreadyUsedFields *
|
198 | 255 | }
|
199 | 256 | return nil
|
200 | 257 | }
|
| 258 | + |
| 259 | +// GetParentID returns the parent id from the event or an error if it's not found |
| 260 | +func (l *LibhoneyEvent) GetParentID(fieldName string) (trc.SpanID, error) { |
| 261 | + if pid, ok := l.Data[fieldName]; ok { |
| 262 | + pid := strings.ReplaceAll(pid.(string), "-", "") |
| 263 | + pidByteArray, err := hex.DecodeString(pid) |
| 264 | + if err == nil { |
| 265 | + if len(pidByteArray) == 32 { |
| 266 | + pidByteArray = pidByteArray[8:24] |
| 267 | + } else if len(pidByteArray) >= 16 { |
| 268 | + pidByteArray = pidByteArray[0:16] |
| 269 | + } |
| 270 | + return trc.SpanID(pidByteArray), nil |
| 271 | + } |
| 272 | + return trc.SpanID{}, errors.New("parent id is not a valid span id") |
| 273 | + } |
| 274 | + return trc.SpanID{}, errors.New("parent id not found") |
| 275 | +} |
| 276 | + |
| 277 | +// ToPTraceSpan converts a LibhoneyEvent to a Pdata Span |
| 278 | +func (l *LibhoneyEvent) ToPTraceSpan(newSpan *ptrace.Span, alreadyUsedFields *[]string, cfg FieldMapConfig, logger zap.Logger) error { |
| 279 | + time_ns := l.MsgPackTimestamp.UnixNano() |
| 280 | + logger.Debug("processing trace with", zap.Int64("timestamp", time_ns)) |
| 281 | + |
| 282 | + var parent_id trc.SpanID |
| 283 | + if pid, ok := l.Data[cfg.Attributes.ParentID]; ok { |
| 284 | + parent_id = spanIDFrom(pid.(string)) |
| 285 | + newSpan.SetParentSpanID(pcommon.SpanID(parent_id)) |
| 286 | + } |
| 287 | + |
| 288 | + duration_ms := 0.0 |
| 289 | + for _, df := range cfg.Attributes.DurationFields { |
| 290 | + if duration, okay := l.Data[df]; okay { |
| 291 | + duration_ms = duration.(float64) |
| 292 | + break |
| 293 | + } |
| 294 | + } |
| 295 | + end_timestamp := time_ns + (int64(duration_ms) * 1000000) |
| 296 | + |
| 297 | + if tid, ok := l.Data[cfg.Attributes.TraceID]; ok { |
| 298 | + tid := strings.ReplaceAll(tid.(string), "-", "") |
| 299 | + tidByteArray, err := hex.DecodeString(tid) |
| 300 | + if err == nil { |
| 301 | + if len(tidByteArray) >= 32 { |
| 302 | + tidByteArray = tidByteArray[0:32] |
| 303 | + } |
| 304 | + newSpan.SetTraceID(pcommon.TraceID(tidByteArray)) |
| 305 | + } else { |
| 306 | + newSpan.SetTraceID(pcommon.TraceID(traceIDFrom(tid))) |
| 307 | + } |
| 308 | + } else { |
| 309 | + newSpan.SetTraceID(pcommon.TraceID(generateAnId(32))) |
| 310 | + } |
| 311 | + |
| 312 | + if sid, ok := l.Data[cfg.Attributes.SpanID]; ok { |
| 313 | + sid := strings.ReplaceAll(sid.(string), "-", "") |
| 314 | + sidByteArray, err := hex.DecodeString(sid) |
| 315 | + if err == nil { |
| 316 | + if len(sidByteArray) == 32 { |
| 317 | + sidByteArray = sidByteArray[8:24] |
| 318 | + } else if len(sidByteArray) >= 16 { |
| 319 | + sidByteArray = sidByteArray[0:16] |
| 320 | + } |
| 321 | + newSpan.SetSpanID(pcommon.SpanID(sidByteArray)) |
| 322 | + } else { |
| 323 | + newSpan.SetSpanID(pcommon.SpanID(spanIDFrom(sid))) |
| 324 | + } |
| 325 | + } else { |
| 326 | + newSpan.SetSpanID(pcommon.SpanID(generateAnId(16))) |
| 327 | + } |
| 328 | + |
| 329 | + newSpan.SetStartTimestamp(pcommon.Timestamp(time_ns)) |
| 330 | + newSpan.SetEndTimestamp(pcommon.Timestamp(end_timestamp)) |
| 331 | + |
| 332 | + if spanName, ok := l.Data[cfg.Attributes.Name]; ok { |
| 333 | + newSpan.SetName(spanName.(string)) |
| 334 | + } |
| 335 | + if spanStatusMessge, ok := l.Data["status_message"]; ok { |
| 336 | + newSpan.Status().SetMessage(spanStatusMessge.(string)) |
| 337 | + } |
| 338 | + newSpan.Status().SetCode(ptrace.StatusCodeUnset) |
| 339 | + |
| 340 | + if _, ok := l.Data[cfg.Attributes.Error]; ok { |
| 341 | + newSpan.Status().SetCode(ptrace.StatusCodeError) |
| 342 | + } |
| 343 | + |
| 344 | + if spanKind, ok := l.Data[cfg.Attributes.SpanKind]; ok { |
| 345 | + switch spanKind.(string) { |
| 346 | + case "server": |
| 347 | + newSpan.SetKind(ptrace.SpanKindServer) |
| 348 | + case "client": |
| 349 | + newSpan.SetKind(ptrace.SpanKindClient) |
| 350 | + case "producer": |
| 351 | + newSpan.SetKind(ptrace.SpanKindProducer) |
| 352 | + case "consumer": |
| 353 | + newSpan.SetKind(ptrace.SpanKindConsumer) |
| 354 | + case "internal": |
| 355 | + newSpan.SetKind(ptrace.SpanKindInternal) |
| 356 | + default: |
| 357 | + newSpan.SetKind(ptrace.SpanKindUnspecified) |
| 358 | + } |
| 359 | + } |
| 360 | + |
| 361 | + newSpan.Attributes().PutInt("SampleRate", int64(l.Samplerate)) |
| 362 | + |
| 363 | + for k, v := range l.Data { |
| 364 | + if slices.Contains(*alreadyUsedFields, k) { |
| 365 | + continue |
| 366 | + } |
| 367 | + switch v := v.(type) { |
| 368 | + case string: |
| 369 | + newSpan.Attributes().PutStr(k, v) |
| 370 | + case int: |
| 371 | + newSpan.Attributes().PutInt(k, int64(v)) |
| 372 | + case int64, int16, int32: |
| 373 | + intv := v.(int64) |
| 374 | + newSpan.Attributes().PutInt(k, intv) |
| 375 | + case float64: |
| 376 | + newSpan.Attributes().PutDouble(k, v) |
| 377 | + case bool: |
| 378 | + newSpan.Attributes().PutBool(k, v) |
| 379 | + default: |
| 380 | + logger.Warn("Span data type issue", zap.String("trace.trace_id", newSpan.TraceID().String()), zap.String("trace.span_id", newSpan.SpanID().String()), zap.String("key", k)) |
| 381 | + } |
| 382 | + } |
| 383 | + return nil |
| 384 | +} |
0 commit comments