@@ -97,6 +97,12 @@ func (v *vpcFlowLogUnmarshaler) UnmarshalLogs(content []byte) (plog.Logs, error)
97
97
}
98
98
}
99
99
100
+ // resourceKey stores the account id and region
101
+ // of the flow logs. All log lines inside the
102
+ // same S3 file come from the same account and
103
+ // region.
104
+ //
105
+ // See https://docs.aws.amazon.com/vpc/latest/userguide/flow-logs-s3-path.html.
100
106
type resourceKey struct {
101
107
accountID string
102
108
region string
@@ -111,10 +117,11 @@ func (v *vpcFlowLogUnmarshaler) unmarshalPlainTextLogs(reader io.Reader) (plog.L
111
117
fields = strings .Split (firstLine , " " )
112
118
}
113
119
114
- scopeLogsByResource := map [resourceKey ]plog.ScopeLogs {}
120
+ logs , resourceLogs , scopeLogs := v .createLogs ()
121
+ key := & resourceKey {}
115
122
for scanner .Scan () {
116
123
line := scanner .Text ()
117
- if err := v .addToLogs (scopeLogsByResource , fields , line ); err != nil {
124
+ if err := v .addToLogs (key , scopeLogs , fields , line ); err != nil {
118
125
return plog.Logs {}, err
119
126
}
120
127
}
@@ -123,70 +130,64 @@ func (v *vpcFlowLogUnmarshaler) unmarshalPlainTextLogs(reader io.Reader) (plog.L
123
130
return plog.Logs {}, fmt .Errorf ("error reading log line: %w" , err )
124
131
}
125
132
126
- return v .createLogs (scopeLogsByResource ), nil
133
+ v .setResourceAttributes (key , resourceLogs )
134
+ return logs , nil
127
135
}
128
136
129
- // createLogs based on the scopeLogsByResource map
130
- func (v * vpcFlowLogUnmarshaler ) createLogs (scopeLogsByResource map [ resourceKey ] plog.ScopeLogs ) plog.Logs {
137
+ // createLogs with the expected fields for the scope logs
138
+ func (v * vpcFlowLogUnmarshaler ) createLogs () ( plog.Logs , plog.ResourceLogs , plog. ScopeLogs ) {
131
139
logs := plog .NewLogs ()
140
+ resourceLogs := logs .ResourceLogs ().AppendEmpty ()
141
+ scopeLogs := resourceLogs .ScopeLogs ().AppendEmpty ()
142
+ scopeLogs .Scope ().SetName (metadata .ScopeName )
143
+ scopeLogs .Scope ().SetVersion (v .buildInfo .Version )
144
+ return logs , resourceLogs , scopeLogs
145
+ }
132
146
133
- for key , scopeLogs := range scopeLogsByResource {
134
- rl := logs .ResourceLogs ().AppendEmpty ()
135
- attr := rl .Resource ().Attributes ()
136
- attr .PutStr (conventions .AttributeCloudProvider , conventions .AttributeCloudProviderAWS )
137
- if key .accountID != "" {
138
- attr .PutStr (conventions .AttributeCloudAccountID , key .accountID )
139
- }
140
- if key .region != "" {
141
- attr .PutStr (conventions .AttributeCloudRegion , key .region )
142
- }
143
- scopeLogs .MoveTo (rl .ScopeLogs ().AppendEmpty ())
147
+ // setResourceAttributes based on the resourceKey
148
+ func (v * vpcFlowLogUnmarshaler ) setResourceAttributes (key * resourceKey , logs plog.ResourceLogs ) {
149
+ attr := logs .Resource ().Attributes ()
150
+ attr .PutStr (conventions .AttributeCloudProvider , conventions .AttributeCloudProviderAWS )
151
+ if key .accountID != "" {
152
+ attr .PutStr (conventions .AttributeCloudAccountID , key .accountID )
153
+ }
154
+ if key .region != "" {
155
+ attr .PutStr (conventions .AttributeCloudRegion , key .region )
144
156
}
157
+ }
145
158
146
- return logs
159
+ // address stores the four fields related to the address
160
+ // of a VPC flow log: srcaddr, pkt-srcaddr, dstaddr, and
161
+ // pkt-dstaddr. We save these fields in a struct, so we
162
+ // can use the right naming conventions in the end.
163
+ //
164
+ // See https://docs.aws.amazon.com/vpc/latest/userguide/flow-logs-records-examples.html#flow-log-example-nat.
165
+ type address struct {
166
+ source string
167
+ pktSource string
168
+ destination string
169
+ pktDestination string
147
170
}
148
171
149
172
// addToLogs parses the log line and creates
150
173
// a new record log. The record log is added
151
174
// to the scope logs of the resource identified
152
175
// by the resourceKey created from the values.
153
176
func (v * vpcFlowLogUnmarshaler ) addToLogs (
154
- scopeLogsByResource map [resourceKey ]plog.ScopeLogs ,
177
+ key * resourceKey ,
178
+ scopeLogs plog.ScopeLogs ,
155
179
fields []string ,
156
180
logLine string ,
157
181
) error {
158
- // first line includes the fields
159
- // TODO Replace with an iterator starting from go 1.24:
160
- // https://pkg.go.dev/strings#FieldsSeq
161
- nFields := len (fields )
162
- nValues := strings .Count (logLine , " " ) + 1
163
- if nFields != nValues {
164
- return fmt .Errorf ("expect %d fields per log line, got log line with %d fields" , nFields , nValues )
165
- }
166
-
167
- // create new key for resource and new
168
- // log record to add to the scope of logs
169
- // of the resource
170
- key := & resourceKey {}
171
182
record := plog .NewLogRecord ()
172
183
173
- // There are 4 fields for the addresses: srcaddr, pkt-srcaddr,
174
- // dstaddr, pkt-dstaddr. We will save these fields in a
175
- // map, so we can use the right conventions in the end.
176
- //
177
- // See https://docs.aws.amazon.com/vpc/latest/userguide/flow-logs-records-examples.html#flow-log-example-nat.
178
- ips := make (map [string ]string , 4 )
179
-
180
- start := 0
184
+ addr := & address {}
181
185
for _ , field := range fields {
182
- var value string
183
- end := strings .Index (logLine [start :], " " )
184
- if end == - 1 {
185
- value = logLine [start :]
186
- } else {
187
- value = logLine [start : start + end ]
188
- start += end + 1 // skip the space
186
+ if logLine == "" {
187
+ return errors .New ("log line has less fields than the ones expected" )
189
188
}
189
+ var value string
190
+ value , logLine , _ = strings .Cut (logLine , " " )
190
191
191
192
if value == "-" {
192
193
// If a field is not applicable or could not be computed for a
@@ -201,7 +202,7 @@ func (v *vpcFlowLogUnmarshaler) addToLogs(
201
202
continue
202
203
}
203
204
204
- found , err := handleField (field , value , record , ips , key )
205
+ found , err := handleField (field , value , record , addr , key )
205
206
if err != nil {
206
207
return err
207
208
}
@@ -213,70 +214,51 @@ func (v *vpcFlowLogUnmarshaler) addToLogs(
213
214
}
214
215
}
215
216
216
- // get the address fields
217
- addresses := v .handleAddresses (ips )
218
- for field , value := range addresses {
219
- record .Attributes ().PutStr (field , value )
217
+ if logLine != "" {
218
+ return errors .New ("log line has more fields than the ones expected" )
220
219
}
221
220
222
- scopeLogs := v .getScopeLogs (* key , scopeLogsByResource )
221
+ // add the address fields with the correct conventions
222
+ // to the log record
223
+ v .handleAddresses (addr , record )
223
224
rScope := scopeLogs .LogRecords ().AppendEmpty ()
224
225
record .MoveTo (rScope )
225
226
226
227
return nil
227
228
}
228
229
229
- // handleAddresses creates a new map where the original field
230
- // names will be the known conventions for the fields
231
- func ( v * vpcFlowLogUnmarshaler ) handleAddresses ( addresses map [ string ] string ) map [ string ] string {
232
- // max is 3 fields, see example in
230
+ // handleAddresses creates adds the addresses to the log record
231
+ func ( v * vpcFlowLogUnmarshaler ) handleAddresses ( addr * address , record plog. LogRecord ) {
232
+ localAddrSet := false
233
+ // see example in
233
234
// https://docs.aws.amazon.com/vpc/latest/userguide/flow-logs-records-examples.html#flow-log-example-nat
234
- recordAttr := make (map [string ]string , 3 )
235
- srcaddr , foundSrc := addresses ["srcaddr" ]
236
- pktSrcaddr , foundSrcPkt := addresses ["pkt-srcaddr" ]
237
- if ! foundSrcPkt && foundSrc {
235
+ if addr .pktSource == "" && addr .source != "" {
238
236
// there is no middle layer, assume "srcaddr" field
239
237
// corresponds to the original source address.
240
- recordAttr [ conventions .AttributeSourceAddress ] = srcaddr
241
- } else if foundSrcPkt && foundSrc {
242
- recordAttr [ conventions .AttributeSourceAddress ] = pktSrcaddr
243
- if srcaddr != pktSrcaddr {
238
+ record . Attributes (). PutStr ( conventions .AttributeSourceAddress , addr . source )
239
+ } else if addr . pktSource != "" && addr . source != "" {
240
+ record . Attributes (). PutStr ( conventions .AttributeSourceAddress , addr . pktSource )
241
+ if addr . pktSource != addr . source {
244
242
// srcaddr is the middle layer
245
- recordAttr [conventions .AttributeNetworkPeerAddress ] = srcaddr
243
+ record .Attributes ().PutStr (conventions .AttributeNetworkLocalAddress , addr .source )
244
+ localAddrSet = true
246
245
}
247
246
}
248
247
249
- dstaddr , foundDst := addresses ["dstaddr" ]
250
- pktDstaddr , foundDstPkt := addresses ["pkt-dstaddr" ]
251
- if ! foundDstPkt && foundDst {
248
+ if addr .pktDestination == "" && addr .destination != "" {
252
249
// there is no middle layer, assume "dstaddr" field
253
250
// corresponds to the original destination address.
254
- recordAttr [ conventions .AttributeDestinationAddress ] = dstaddr
255
- } else if foundDstPkt && foundDst {
256
- recordAttr [ conventions .AttributeDestinationAddress ] = pktDstaddr
257
- if pktDstaddr != dstaddr {
258
- if _ , found := recordAttr [ conventions . AttributeNetworkPeerAddress ]; found {
251
+ record . Attributes (). PutStr ( conventions .AttributeDestinationAddress , addr . destination )
252
+ } else if addr . pktDestination != "" && addr . destination != "" {
253
+ record . Attributes (). PutStr ( conventions .AttributeDestinationAddress , addr . pktDestination )
254
+ if addr . pktDestination != addr . destination {
255
+ if localAddrSet {
259
256
v .logger .Warn ("unexpected: srcaddr, dstaddr, pkt-srcaddr and pkt-dstaddr are all different" )
260
257
}
261
258
// dstaddr is the middle layer
262
- recordAttr [ conventions .AttributeNetworkPeerAddress ] = dstaddr
259
+ record . Attributes (). PutStr ( conventions .AttributeNetworkLocalAddress , addr . destination )
263
260
}
264
261
}
265
-
266
- return recordAttr
267
- }
268
-
269
- // getScopeLogs for the given key. If it does not exist yet,
270
- // create new scope logs, and add the key to the logs map.
271
- func (v * vpcFlowLogUnmarshaler ) getScopeLogs (key resourceKey , logs map [resourceKey ]plog.ScopeLogs ) plog.ScopeLogs {
272
- scopeLogs , ok := logs [key ]
273
- if ! ok {
274
- scopeLogs = plog .NewScopeLogs ()
275
- scopeLogs .Scope ().SetName (metadata .ScopeName )
276
- scopeLogs .Scope ().SetVersion (v .buildInfo .Version )
277
- logs [key ] = scopeLogs
278
- }
279
- return scopeLogs
280
262
}
281
263
282
264
// handleField analyzes the given field and it either
@@ -287,7 +269,7 @@ func handleField(
287
269
field string ,
288
270
value string ,
289
271
record plog.LogRecord ,
290
- ips map [ string ] string ,
272
+ addr * address ,
291
273
key * resourceKey ,
292
274
) (bool , error ) {
293
275
// convert string to number
@@ -312,9 +294,19 @@ func handleField(
312
294
313
295
switch field {
314
296
// TODO Add support for ECS fields
315
- case "srcaddr" , "pkt-srcaddr" , "dstaddr" , "pkt-dstaddr" :
297
+ case "srcaddr" :
298
+ // handled later
299
+ addr .source = value
300
+ case "pkt-srcaddr" :
301
+ // handled later
302
+ addr .pktSource = value
303
+ case "dstaddr" :
316
304
// handled later
317
- ips [field ] = value
305
+ addr .destination = value
306
+ case "pkt-dstaddr" :
307
+ // handled later
308
+ addr .pktDestination = value
309
+
318
310
case "account-id" :
319
311
key .accountID = value
320
312
case "vpc-id" :
0 commit comments