Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## [master](https://github.com/arangodb/kube-arangodb/tree/master) (N/A)
- (Feature) (Platform) MetaV1 Integration Service
- (Feature) (Platform) Chart Overrides
- (Feature) Parallel Executor

## [1.2.49](https://github.com/arangodb/kube-arangodb/tree/1.2.49) (2025-06-17)
- (Maintenance) Optimize go.mod
Expand Down
15 changes: 14 additions & 1 deletion pkg/deployment/resources/pod_inspector.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/arangodb/kube-arangodb/pkg/deployment/agency/state"
"github.com/arangodb/kube-arangodb/pkg/deployment/features"
"github.com/arangodb/kube-arangodb/pkg/deployment/patch"
"github.com/arangodb/kube-arangodb/pkg/handlers/utils"
"github.com/arangodb/kube-arangodb/pkg/metrics"
"github.com/arangodb/kube-arangodb/pkg/util"
"github.com/arangodb/kube-arangodb/pkg/util/constants"
Expand Down Expand Up @@ -106,7 +107,7 @@ func (r *Resources) InspectPods(ctx context.Context, cachedStatus inspectorInter

spec := r.context.GetSpec()
groupSpec := spec.GetServerGroupSpec(group)
coreContainers := spec.GetCoreContainers(group)
coreContainers := getPodCoreContainers(spec.GetCoreContainers(group), pod.Spec.Containers)

if c, ok := memberStatus.Conditions.Get(api.ConditionTypeUpdating); ok {
if v, ok := c.Params[api.ConditionParamContainerUpdatingName]; ok {
Expand Down Expand Up @@ -523,3 +524,15 @@ func removeLabel(labels map[string]string, key string) map[string]string {

return labels
}

func getPodCoreContainers(coreContainers utils.StringList, containers []core.Container) utils.StringList {
r := make(utils.StringList, 0, len(coreContainers))

for _, container := range containers {
if coreContainers.Has(container.Name) {
r = append(r, container.Name)
}
}

return r
}
156 changes: 156 additions & 0 deletions pkg/util/executor/executor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
//
// DISCLAIMER
//
// Copyright 2025 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//

package executor

import (
"context"
"sync"
"time"

"github.com/rs/zerolog"

"github.com/arangodb/kube-arangodb/pkg/logging"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
)

func Run(ctx context.Context, log logging.Logger, threads int, f RunFunc) error {
h := &handler{
th: NewThreadManager(threads),
completed: make(chan struct{}),
log: log,
}

go h.run(ctx, f)

return h.Wait()
}

type RunFunc func(ctx context.Context, log logging.Logger, t Thread, h Handler) error

type Executor interface {
Completed() bool

Wait() error
}

type Handler interface {
RunAsync(ctx context.Context, f RunFunc) Executor

WaitForSubThreads(t Thread)
}

type handler struct {
lock sync.Mutex

th ThreadManager

handlers []*handler

completed chan struct{}

log logging.Logger

err error
}

func (h *handler) WaitForSubThreads(t Thread) {
for {
t.Release()

if h.subThreadsCompleted() {
return
}

time.Sleep(10 * time.Millisecond)
Comment on lines +75 to +82
Copy link

Copilot AI Jun 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Busy-wait loop with fixed Sleep may be inefficient; consider using a synchronization primitive like sync.Cond or a WaitGroup to avoid polling.

Suggested change
for {
t.Release()
if h.subThreadsCompleted() {
return
}
time.Sleep(10 * time.Millisecond)
t.Release()
h.lock.Lock()
defer h.lock.Unlock()
for !h.subThreadsCompleted() {
h.cond.Wait()

Copilot uses AI. Check for mistakes.
Comment on lines +75 to +82
Copy link

Copilot AI Jun 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Busy-waiting with a fixed sleep can introduce unnecessary latency or CPU churn. Consider using a channel or condition variable to signal when sub-threads complete instead of a polling loop.

Suggested change
for {
t.Release()
if h.subThreadsCompleted() {
return
}
time.Sleep(10 * time.Millisecond)
t.Release()
h.lock.Lock()
defer h.lock.Unlock()
for !h.subThreadsCompleted() {
h.cond.Wait()

Copilot uses AI. Check for mistakes.
}
}

func (h *handler) subThreadsCompleted() bool {
h.lock.Lock()
defer h.lock.Unlock()

for id := range h.handlers {
if !h.handlers[id].Completed() {
return false
}
}

return true
}

func (h *handler) Wait() error {
<-h.completed

return h.err
}

func (h *handler) Completed() bool {
select {
case <-h.completed:
return true
default:
return false
}
}

func (h *handler) RunAsync(ctx context.Context, f RunFunc) Executor {
h.lock.Lock()
defer h.lock.Unlock()

n := &handler{
th: h.th,
completed: make(chan struct{}),
log: h.log,
}

h.handlers = append(h.handlers, n)

go n.run(ctx, f)

return n
}

func (h *handler) run(ctx context.Context, entry RunFunc) {
defer close(h.completed)

err := h.runE(ctx, entry)

subErrors := make([]error, len(h.handlers))

for id := range subErrors {
subErrors[id] = h.handlers[id].Wait()
}

subError := errors.Errors(subErrors...)

h.err = errors.Errors(err, subError)
}

func (h *handler) runE(ctx context.Context, entry RunFunc) error {
t := h.th.Acquire()
defer t.Release()

log := h.log.Wrap(func(in *zerolog.Event) *zerolog.Event {
return in.Int("thread", int(t.ID()))
})

return entry(ctx, log, t, h)
}
59 changes: 59 additions & 0 deletions pkg/util/executor/executor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
//
// DISCLAIMER
//
// Copyright 2025 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//

package executor

import (
"context"
"testing"

"github.com/stretchr/testify/require"

"github.com/arangodb/kube-arangodb/pkg/logging"
)

func Test_Executor(t *testing.T) {
ctx := context.Background()

logger := logging.Global().RegisterAndGetLogger("test", logging.Trace)

require.NoError(t, Run(ctx, logger, 1, func(ctx context.Context, log logging.Logger, th Thread, h Handler) error {
log.Info("Start main thread")
defer log.Info("Complete main thread")

h.RunAsync(ctx, func(ctx context.Context, log logging.Logger, th Thread, h Handler) error {
log.Info("Start second thread")
defer log.Info("Complete second thread")

h.RunAsync(ctx, func(ctx context.Context, log logging.Logger, th Thread, h Handler) error {
log.Info("Start third thread")
defer log.Info("Complete third thread")

return nil
})

return nil
})

h.WaitForSubThreads(th)

return nil
}))
}
97 changes: 97 additions & 0 deletions pkg/util/executor/threader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
//
// DISCLAIMER
//
// Copyright 2025 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//

package executor

import "sync"

func NewThreadManager(threads int) ThreadManager {
Copy link

Copilot AI Jun 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NewThreadManager does not validate that 'threads' is positive; passing zero or negative values will cause a panic or deadlock. Consider adding input validation.

Suggested change
func NewThreadManager(threads int) ThreadManager {
func NewThreadManager(threads int) ThreadManager {
if threads <= 0 {
panic("threads must be a positive integer")
}

Copilot uses AI. Check for mistakes.
r := make(chan ThreadID, threads)

for id := 0; id < threads; id++ {
r <- ThreadID(id)
}

return &threadManager{
threads: r,
}
}

type ThreadID int

type ThreadManager interface {
Acquire() Thread
}

type threadManager struct {
threads chan ThreadID
}

func (t *threadManager) Acquire() Thread {
id := <-t.threads

return &thread{
parent: t,
id: id,
}
}

type Thread interface {
ID() ThreadID

Release()
Wait()
}

type thread struct {
lock sync.Mutex

parent *threadManager

released bool

id ThreadID
}

func (t *thread) ID() ThreadID {
return t.id
}

func (t *thread) Release() {
t.lock.Lock()
defer t.lock.Unlock()

if t.released {
return
}

t.released = true

t.parent.threads <- t.id
}

func (t *thread) Wait() {
Copy link

Copilot AI Jun 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait sends the thread ID back to the channel without checking the released flag, which can cause duplicate sends and inconsistent state. Consider guarding against multiple sends or updating the released flag.

Copilot uses AI. Check for mistakes.
Copy link

Copilot AI Jun 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Wait method unconditionally pushes the thread ID back into the channel without checking if it was already released, which can lead to duplicate IDs in the pool and potential deadlocks. Consider guarding against double-release or redesigning Wait to properly handle the acquire/release lifecycle.

Copilot uses AI. Check for mistakes.
t.lock.Lock()
defer t.lock.Unlock()

t.parent.threads <- t.id

t.id = <-t.parent.threads
}
Comment on lines +90 to +97
Copy link

Copilot AI Jun 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This unexported Wait method is never used; consider removing it to reduce dead code and improve clarity.

Suggested change
func (t *thread) Wait() {
t.lock.Lock()
defer t.lock.Unlock()
t.parent.threads <- t.id
t.id = <-t.parent.threads
}

Copilot uses AI. Check for mistakes.