|
| 1 | +// Copyright The OpenTelemetry Authors |
| 2 | +// |
| 3 | +// Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | +// you may not use this file except in compliance with the License. |
| 5 | +// You may obtain a copy of the License at |
| 6 | +// |
| 7 | +// http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | +// |
| 9 | +// Unless required by applicable law or agreed to in writing, software |
| 10 | +// distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | +// See the License for the specific language governing permissions and |
| 13 | +// limitations under the License. |
| 14 | + |
| 15 | +package allocation |
| 16 | + |
| 17 | +import ( |
| 18 | + "errors" |
| 19 | + "sync" |
| 20 | + |
| 21 | + "github.com/go-logr/logr" |
| 22 | + "github.com/prometheus/client_golang/prometheus" |
| 23 | + |
| 24 | + "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/diff" |
| 25 | + "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/target" |
| 26 | +) |
| 27 | + |
| 28 | +/* |
| 29 | + Target Allocator will serve on an HTTP server exposing /jobs/<job_id>/targets |
| 30 | + The targets are allocated using the least connection method |
| 31 | + Target Allocator will need information about the collectors in order to set the URLs |
| 32 | + Keep a Map of what each collector currently holds and update it based on new scrape target updates |
| 33 | +*/ |
| 34 | + |
| 35 | +var _ Allocator = &allocator{} |
| 36 | + |
| 37 | +func newAllocator(log logr.Logger, strategy Strategy, opts ...AllocationOption) Allocator { |
| 38 | + chAllocator := &allocator{ |
| 39 | + strategy: strategy, |
| 40 | + collectors: make(map[string]*Collector), |
| 41 | + targetItems: make(map[string]*target.Item), |
| 42 | + targetItemsPerJobPerCollector: make(map[string]map[string]map[string]bool), |
| 43 | + log: log, |
| 44 | + } |
| 45 | + for _, opt := range opts { |
| 46 | + opt(chAllocator) |
| 47 | + } |
| 48 | + |
| 49 | + return chAllocator |
| 50 | +} |
| 51 | + |
| 52 | +type allocator struct { |
| 53 | + strategy Strategy |
| 54 | + |
| 55 | + // collectors is a map from a Collector's name to a Collector instance |
| 56 | + // collectorKey -> collector pointer |
| 57 | + collectors map[string]*Collector |
| 58 | + |
| 59 | + // targetItems is a map from a target item's hash to the target items allocated state |
| 60 | + // targetItem hash -> target item pointer |
| 61 | + targetItems map[string]*target.Item |
| 62 | + |
| 63 | + // collectorKey -> job -> target item hash -> true |
| 64 | + targetItemsPerJobPerCollector map[string]map[string]map[string]bool |
| 65 | + |
| 66 | + // m protects collectors, targetItems and targetItemsPerJobPerCollector for concurrent use. |
| 67 | + m sync.RWMutex |
| 68 | + |
| 69 | + log logr.Logger |
| 70 | + |
| 71 | + filter Filter |
| 72 | +} |
| 73 | + |
| 74 | +// SetFilter sets the filtering hook to use. |
| 75 | +func (a *allocator) SetFilter(filter Filter) { |
| 76 | + a.filter = filter |
| 77 | +} |
| 78 | + |
| 79 | +// SetTargets accepts a list of targets that will be used to make |
| 80 | +// load balancing decisions. This method should be called when there are |
| 81 | +// new targets discovered or existing targets are shutdown. |
| 82 | +func (a *allocator) SetTargets(targets map[string]*target.Item) { |
| 83 | + timer := prometheus.NewTimer(TimeToAssign.WithLabelValues("SetTargets", a.strategy.GetName())) |
| 84 | + defer timer.ObserveDuration() |
| 85 | + |
| 86 | + if a.filter != nil { |
| 87 | + targets = a.filter.Apply(targets) |
| 88 | + } |
| 89 | + RecordTargetsKept(targets) |
| 90 | + |
| 91 | + a.m.Lock() |
| 92 | + defer a.m.Unlock() |
| 93 | + |
| 94 | + // Check for target changes |
| 95 | + targetsDiff := diff.Maps(a.targetItems, targets) |
| 96 | + // If there are any additions or removals |
| 97 | + if len(targetsDiff.Additions()) != 0 || len(targetsDiff.Removals()) != 0 { |
| 98 | + a.handleTargets(targetsDiff) |
| 99 | + } |
| 100 | +} |
| 101 | + |
| 102 | +// SetCollectors sets the set of collectors with key=collectorName, value=Collector object. |
| 103 | +// This method is called when Collectors are added or removed. |
| 104 | +func (a *allocator) SetCollectors(collectors map[string]*Collector) { |
| 105 | + timer := prometheus.NewTimer(TimeToAssign.WithLabelValues("SetCollectors", a.strategy.GetName())) |
| 106 | + defer timer.ObserveDuration() |
| 107 | + |
| 108 | + CollectorsAllocatable.WithLabelValues(a.strategy.GetName()).Set(float64(len(collectors))) |
| 109 | + if len(collectors) == 0 { |
| 110 | + a.log.Info("No collector instances present") |
| 111 | + } |
| 112 | + |
| 113 | + a.m.Lock() |
| 114 | + defer a.m.Unlock() |
| 115 | + |
| 116 | + // Check for collector changes |
| 117 | + collectorsDiff := diff.Maps(a.collectors, collectors) |
| 118 | + if len(collectorsDiff.Additions()) != 0 || len(collectorsDiff.Removals()) != 0 { |
| 119 | + a.handleCollectors(collectorsDiff) |
| 120 | + } |
| 121 | +} |
| 122 | + |
| 123 | +func (a *allocator) GetTargetsForCollectorAndJob(collector string, job string) []*target.Item { |
| 124 | + a.m.RLock() |
| 125 | + defer a.m.RUnlock() |
| 126 | + if _, ok := a.targetItemsPerJobPerCollector[collector]; !ok { |
| 127 | + return []*target.Item{} |
| 128 | + } |
| 129 | + if _, ok := a.targetItemsPerJobPerCollector[collector][job]; !ok { |
| 130 | + return []*target.Item{} |
| 131 | + } |
| 132 | + targetItemsCopy := make([]*target.Item, len(a.targetItemsPerJobPerCollector[collector][job])) |
| 133 | + index := 0 |
| 134 | + for targetHash := range a.targetItemsPerJobPerCollector[collector][job] { |
| 135 | + targetItemsCopy[index] = a.targetItems[targetHash] |
| 136 | + index++ |
| 137 | + } |
| 138 | + return targetItemsCopy |
| 139 | +} |
| 140 | + |
| 141 | +// TargetItems returns a shallow copy of the targetItems map. |
| 142 | +func (a *allocator) TargetItems() map[string]*target.Item { |
| 143 | + a.m.RLock() |
| 144 | + defer a.m.RUnlock() |
| 145 | + targetItemsCopy := make(map[string]*target.Item) |
| 146 | + for k, v := range a.targetItems { |
| 147 | + targetItemsCopy[k] = v |
| 148 | + } |
| 149 | + return targetItemsCopy |
| 150 | +} |
| 151 | + |
| 152 | +// Collectors returns a shallow copy of the collectors map. |
| 153 | +func (a *allocator) Collectors() map[string]*Collector { |
| 154 | + a.m.RLock() |
| 155 | + defer a.m.RUnlock() |
| 156 | + collectorsCopy := make(map[string]*Collector) |
| 157 | + for k, v := range a.collectors { |
| 158 | + collectorsCopy[k] = v |
| 159 | + } |
| 160 | + return collectorsCopy |
| 161 | +} |
| 162 | + |
| 163 | +// handleTargets receives the new and removed targets and reconciles the current state. |
| 164 | +// Any removals are removed from the allocator's targetItems and unassigned from the corresponding collector. |
| 165 | +// Any net-new additions are assigned to the collector on the same node as the target. |
| 166 | +func (a *allocator) handleTargets(diff diff.Changes[*target.Item]) { |
| 167 | + // Check for removals |
| 168 | + for k, item := range a.targetItems { |
| 169 | + // if the current item is in the removals list |
| 170 | + if _, ok := diff.Removals()[k]; ok { |
| 171 | + a.removeTargetItem(item) |
| 172 | + } |
| 173 | + } |
| 174 | + |
| 175 | + // Check for additions |
| 176 | + assignmentErrors := []error{} |
| 177 | + for k, item := range diff.Additions() { |
| 178 | + // Do nothing if the item is already there |
| 179 | + if _, ok := a.targetItems[k]; ok { |
| 180 | + continue |
| 181 | + } else { |
| 182 | + // TODO: track target -> collector relationship in a separate map |
| 183 | + item.CollectorName = "" |
| 184 | + // Add item to item pool and assign a collector |
| 185 | + err := a.addTargetToTargetItems(item) |
| 186 | + if err != nil { |
| 187 | + assignmentErrors = append(assignmentErrors, err) |
| 188 | + } |
| 189 | + } |
| 190 | + } |
| 191 | + |
| 192 | + // Check for unassigned targets |
| 193 | + unassignedTargets := len(assignmentErrors) |
| 194 | + if unassignedTargets > 0 { |
| 195 | + err := errors.Join(assignmentErrors...) |
| 196 | + a.log.Info("Could not assign targets for some jobs", "targets", unassignedTargets, "error", err) |
| 197 | + TargetsUnassigned.Set(float64(unassignedTargets)) |
| 198 | + } |
| 199 | +} |
| 200 | + |
| 201 | +func (a *allocator) addTargetToTargetItems(tg *target.Item) error { |
| 202 | + a.targetItems[tg.Hash()] = tg |
| 203 | + if len(a.collectors) == 0 { |
| 204 | + return nil |
| 205 | + } |
| 206 | + |
| 207 | + colOwner, err := a.strategy.GetCollectorForTarget(a.collectors, tg) |
| 208 | + if err != nil { |
| 209 | + return err |
| 210 | + } |
| 211 | + |
| 212 | + // Check if this is a reassignment, if so, unassign first |
| 213 | + // note: The ordering here is important, we want to determine the new assignment before unassigning, because |
| 214 | + // the strategy might make use of previous assignment information |
| 215 | + if _, ok := a.collectors[tg.CollectorName]; ok && tg.CollectorName != "" { |
| 216 | + a.unassignTargetItem(tg) |
| 217 | + } |
| 218 | + |
| 219 | + tg.CollectorName = colOwner.Name |
| 220 | + a.addCollectorTargetItemMapping(tg) |
| 221 | + a.collectors[colOwner.Name].NumTargets++ |
| 222 | + TargetsPerCollector.WithLabelValues(colOwner.String(), a.strategy.GetName()).Set(float64(a.collectors[colOwner.String()].NumTargets)) |
| 223 | + |
| 224 | + return nil |
| 225 | +} |
| 226 | + |
| 227 | +// unassignTargetItem unassigns the target item from its Collector. The target item is still tracked. |
| 228 | +func (a *allocator) unassignTargetItem(item *target.Item) { |
| 229 | + collectorName := item.CollectorName |
| 230 | + if collectorName == "" { |
| 231 | + return |
| 232 | + } |
| 233 | + c, ok := a.collectors[collectorName] |
| 234 | + if !ok { |
| 235 | + return |
| 236 | + } |
| 237 | + c.NumTargets-- |
| 238 | + TargetsPerCollector.WithLabelValues(item.CollectorName, a.strategy.GetName()).Set(float64(c.NumTargets)) |
| 239 | + delete(a.targetItemsPerJobPerCollector[item.CollectorName][item.JobName], item.Hash()) |
| 240 | + if len(a.targetItemsPerJobPerCollector[item.CollectorName][item.JobName]) == 0 { |
| 241 | + delete(a.targetItemsPerJobPerCollector[item.CollectorName], item.JobName) |
| 242 | + } |
| 243 | + item.CollectorName = "" |
| 244 | +} |
| 245 | + |
| 246 | +// removeTargetItem removes the target item from its Collector. |
| 247 | +func (a *allocator) removeTargetItem(item *target.Item) { |
| 248 | + a.unassignTargetItem(item) |
| 249 | + delete(a.targetItems, item.Hash()) |
| 250 | +} |
| 251 | + |
| 252 | +// removeCollector removes a Collector from the allocator. |
| 253 | +func (a *allocator) removeCollector(collector *Collector) { |
| 254 | + delete(a.collectors, collector.Name) |
| 255 | + // Remove the collector from any target item records |
| 256 | + for _, targetItems := range a.targetItemsPerJobPerCollector[collector.Name] { |
| 257 | + for targetHash := range targetItems { |
| 258 | + a.targetItems[targetHash].CollectorName = "" |
| 259 | + } |
| 260 | + } |
| 261 | + delete(a.targetItemsPerJobPerCollector, collector.Name) |
| 262 | + TargetsPerCollector.WithLabelValues(collector.Name, a.strategy.GetName()).Set(0) |
| 263 | +} |
| 264 | + |
| 265 | +// addCollectorTargetItemMapping keeps track of which collector has which jobs and targets |
| 266 | +// this allows the allocator to respond without any extra allocations to http calls. The caller of this method |
| 267 | +// has to acquire a lock. |
| 268 | +func (a *allocator) addCollectorTargetItemMapping(tg *target.Item) { |
| 269 | + if a.targetItemsPerJobPerCollector[tg.CollectorName] == nil { |
| 270 | + a.targetItemsPerJobPerCollector[tg.CollectorName] = make(map[string]map[string]bool) |
| 271 | + } |
| 272 | + if a.targetItemsPerJobPerCollector[tg.CollectorName][tg.JobName] == nil { |
| 273 | + a.targetItemsPerJobPerCollector[tg.CollectorName][tg.JobName] = make(map[string]bool) |
| 274 | + } |
| 275 | + a.targetItemsPerJobPerCollector[tg.CollectorName][tg.JobName][tg.Hash()] = true |
| 276 | +} |
| 277 | + |
| 278 | +// handleCollectors receives the new and removed collectors and reconciles the current state. |
| 279 | +// Any removals are removed from the allocator's collectors. New collectors are added to the allocator's collector map. |
| 280 | +// Finally, update all targets' collector assignments. |
| 281 | +func (a *allocator) handleCollectors(diff diff.Changes[*Collector]) { |
| 282 | + // Clear removed collectors |
| 283 | + for _, k := range diff.Removals() { |
| 284 | + a.removeCollector(k) |
| 285 | + } |
| 286 | + // Insert the new collectors |
| 287 | + for _, i := range diff.Additions() { |
| 288 | + a.collectors[i.Name] = NewCollector(i.Name, i.NodeName) |
| 289 | + } |
| 290 | + |
| 291 | + // Set collectors on the strategy |
| 292 | + a.strategy.SetCollectors(a.collectors) |
| 293 | + |
| 294 | + // Re-Allocate all targets |
| 295 | + assignmentErrors := []error{} |
| 296 | + for _, item := range a.targetItems { |
| 297 | + err := a.addTargetToTargetItems(item) |
| 298 | + if err != nil { |
| 299 | + assignmentErrors = append(assignmentErrors, err) |
| 300 | + item.CollectorName = "" |
| 301 | + } |
| 302 | + } |
| 303 | + // Check for unassigned targets |
| 304 | + unassignedTargets := len(assignmentErrors) |
| 305 | + if unassignedTargets > 0 { |
| 306 | + err := errors.Join(assignmentErrors...) |
| 307 | + a.log.Info("Could not assign targets for some jobs", "targets", unassignedTargets, "error", err) |
| 308 | + TargetsUnassigned.Set(float64(unassignedTargets)) |
| 309 | + } |
| 310 | +} |
0 commit comments