@@ -2,7 +2,7 @@ package ebpf
22
33import (
44 "bytes"
5- gocontext "context"
5+ "context"
66 "encoding/binary"
77 "fmt"
88 "strconv"
@@ -11,6 +11,7 @@ import (
1111
1212 "github.com/aquasecurity/tracee/pkg/bufferdecoder"
1313 "github.com/aquasecurity/tracee/pkg/events"
14+ "github.com/aquasecurity/tracee/pkg/events/derive"
1415 "github.com/aquasecurity/tracee/types/trace"
1516)
1617
@@ -19,7 +20,7 @@ import (
1920const maxStackDepth int = 20
2021
2122// handleEvents is a high-level function that starts all operations related to events processing
22- func (t * Tracee ) handleEvents (ctx gocontext .Context ) {
23+ func (t * Tracee ) handleEvents (ctx context .Context ) {
2324 var errcList []<- chan error
2425
2526 // Source pipeline stage.
@@ -87,7 +88,7 @@ func (t *Tracee) handleEvents(ctx gocontext.Context) {
8788// 3) create an internal, to tracee-ebpf, buffer based on the node size.
8889
8990// queueEvents implements an internal FIFO queue for caching events
90- func (t * Tracee ) queueEvents (ctx gocontext .Context , in <- chan * trace.Event ) (chan * trace.Event , chan error ) {
91+ func (t * Tracee ) queueEvents (ctx context .Context , in <- chan * trace.Event ) (chan * trace.Event , chan error ) {
9192 out := make (chan * trace.Event , 10000 )
9293 errc := make (chan error , 1 )
9394 done := make (chan struct {}, 1 )
@@ -129,7 +130,7 @@ func (t *Tracee) queueEvents(ctx gocontext.Context, in <-chan *trace.Event) (cha
129130}
130131
131132// decodeEvents read the events received from the BPF programs and parse it into trace.Event type
132- func (t * Tracee ) decodeEvents (outerCtx gocontext .Context ) (<- chan * trace.Event , <- chan error ) {
133+ func (t * Tracee ) decodeEvents (outerCtx context .Context ) (<- chan * trace.Event , <- chan error ) {
133134 out := make (chan * trace.Event , 10000 )
134135 errc := make (chan error , 1 )
135136 go func () {
@@ -234,7 +235,7 @@ func parseContextFlags(flags uint32) trace.ContextFlags {
234235 }
235236}
236237
237- func (t * Tracee ) processEvents (ctx gocontext .Context , in <- chan * trace.Event ) (<- chan * trace.Event , <- chan error ) {
238+ func (t * Tracee ) processEvents (ctx context .Context , in <- chan * trace.Event ) (<- chan * trace.Event , <- chan error ) {
238239 out := make (chan * trace.Event , 10000 )
239240 errc := make (chan error , 1 )
240241 go func () {
@@ -270,7 +271,41 @@ func (t *Tracee) processEvents(ctx gocontext.Context, in <-chan *trace.Event) (<
270271 return out , errc
271272}
272273
273- func (t * Tracee ) sinkEvents (ctx gocontext.Context , in <- chan * trace.Event ) <- chan error {
274+ // deriveEvents is the derivation pipeline stage
275+ func (t * Tracee ) deriveEvents (ctx context.Context , in <- chan * trace.Event ) (<- chan * trace.Event , <- chan error ) {
276+ out := make (chan * trace.Event )
277+ errc := make (chan error , 1 )
278+
279+ go func () {
280+ defer close (out )
281+ defer close (errc )
282+
283+ for {
284+ select {
285+ case event := <- in :
286+ out <- event
287+
288+ // Derive event before parse its arguments
289+ derivatives , errors := derive .DeriveEvent (* event , t .eventDerivations )
290+
291+ for _ , err := range errors {
292+ t .handleError (err )
293+ }
294+
295+ for _ , derivative := range derivatives {
296+ out <- & derivative
297+ }
298+
299+ case <- ctx .Done ():
300+ return
301+ }
302+ }
303+ }()
304+
305+ return out , errc
306+ }
307+
308+ func (t * Tracee ) sinkEvents (ctx context.Context , in <- chan * trace.Event ) <- chan error {
274309 errc := make (chan error , 1 )
275310
276311 go func () {
0 commit comments