Skip to content

Commit 5f765aa

Browse files
enrich: add queue cleaner goroutine (#2084)
Instead of running cgroup_rmdir cleanup logic right away, cleanup logic is queued similar to the scheduled enrichment tick. If the queue is not ready to be cleaned yet (was not fully enriched) cleanup will be rescheduled.
1 parent f396d91 commit 5f765aa

File tree

1 file changed

+29
-30
lines changed

1 file changed

+29
-30
lines changed

pkg/ebpf/events_enrich.go

Lines changed: 29 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,9 @@ func (t *Tracee) enrichContainerEvents(ctx gocontext.Context, in <-chan *trace.E
6565
enrichInfo := make(map[uint64]*enrichResult)
6666
// 1 queue per cgroupId
6767
queues := make(map[uint64]chan *trace.Event)
68-
// scheduler queue
68+
// scheduler queues
6969
queueReady := make(chan uint64, queueReadySize)
70+
queueClean := make(chan uint64, queueReadySize)
7071

7172
// queues map writer
7273
go func() {
@@ -88,36 +89,8 @@ func (t *Tracee) enrichContainerEvents(ctx gocontext.Context, in <-chan *trace.E
8889
}
8990
// CgroupRmdir: clean up remaining events and maps
9091
if eventID == events.CgroupRmdir {
91-
var eInfo *enrichResult
9292
cgroupId, _ = parse.ArgUint64Val(event, "cgroup_id")
93-
94-
bLock.RLock()
95-
if i, ok := enrichInfo[cgroupId]; ok {
96-
eInfo = i
97-
}
98-
if queue, ok := queues[cgroupId]; ok {
99-
// clean up queue if needed
100-
if len(queue) > 0 {
101-
for evt := range queue {
102-
if evt.ContainerImage == "" && eInfo != nil {
103-
enrichEvent(evt, eInfo.result)
104-
}
105-
out <- evt
106-
// break here if queue is empty otherwise this loop waits forever
107-
if len(queue) < 1 {
108-
break
109-
}
110-
}
111-
}
112-
close(queues[cgroupId])
113-
}
114-
bLock.RUnlock() // give de-queue events a chance
115-
bLock.Lock()
116-
delete(enrichDone, cgroupId)
117-
delete(enrichInfo, cgroupId)
118-
delete(queues, cgroupId)
119-
bLock.Unlock()
120-
93+
queueClean <- cgroupId
12194
continue
12295
}
12396
// make sure a queue channel exists for this cgroupId
@@ -178,6 +151,32 @@ func (t *Tracee) enrichContainerEvents(ctx gocontext.Context, in <-chan *trace.E
178151
}
179152
}()
180153

154+
// queues cleaner
155+
go func() {
156+
for {
157+
select {
158+
case cgroupId := <-queueClean:
159+
bLock.Lock()
160+
if queue, ok := queues[cgroupId]; ok {
161+
// if queue is still full reschedule cleanup
162+
if len(queue) > 0 {
163+
queueClean <- cgroupId
164+
} else {
165+
close(queue)
166+
// start queue cleanup
167+
delete(enrichDone, cgroupId)
168+
delete(enrichInfo, cgroupId)
169+
delete(queues, cgroupId)
170+
}
171+
}
172+
bLock.Unlock()
173+
174+
case <-ctx.Done():
175+
return
176+
}
177+
}
178+
}()
179+
181180
return out, errc
182181
}
183182

0 commit comments

Comments
 (0)