Skip to content

Commit b31afb4

Browse files
fix(source/wrappers/events): events not triggered (#5687)
Signed-off-by: ivan katliarchuk <[email protected]>
1 parent d2d2b40 commit b31afb4

File tree

14 files changed

+398
-66
lines changed

14 files changed

+398
-66
lines changed

controller/execute.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -426,10 +426,15 @@ func buildSource(ctx context.Context, cfg *externaldns.Config) (source.Source, e
426426
}
427427
// Combine multiple sources into a single, deduplicated source.
428428
combinedSource := wrappers.NewDedupSource(wrappers.NewMultiSource(sources, sourceCfg.DefaultTargets, sourceCfg.ForceDefaultTargets))
429+
cfg.AddSourceWrapper("dedup")
430+
combinedSource = wrappers.NewNAT64Source(combinedSource, cfg.NAT64Networks)
431+
cfg.AddSourceWrapper("nat64")
429432
// Filter targets
430433
targetFilter := endpoint.NewTargetNetFilterWithExclusions(cfg.TargetNetFilter, cfg.ExcludeTargetNets)
431-
combinedSource = wrappers.NewNAT64Source(combinedSource, cfg.NAT64Networks)
432-
combinedSource = wrappers.NewTargetFilterSource(combinedSource, targetFilter)
434+
if targetFilter.IsEnabled() {
435+
combinedSource = wrappers.NewTargetFilterSource(combinedSource, targetFilter)
436+
cfg.AddSourceWrapper("target-filter")
437+
}
433438
return combinedSource, nil
434439
}
435440

controller/execute_test.go

Lines changed: 47 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ package controller
1818

1919
import (
2020
"bytes"
21-
"context"
2221
"fmt"
2322
"net"
2423
"net/http"
@@ -36,8 +35,8 @@ import (
3635
"github.com/stretchr/testify/require"
3736
"sigs.k8s.io/external-dns/endpoint"
3837
"sigs.k8s.io/external-dns/pkg/apis/externaldns"
39-
"sigs.k8s.io/external-dns/plan"
4038
"sigs.k8s.io/external-dns/provider"
39+
fakeprovider "sigs.k8s.io/external-dns/provider/fakes"
4140
)
4241

4342
func TestSelectRegistry(t *testing.T) {
@@ -60,7 +59,7 @@ func TestSelectRegistry(t *testing.T) {
6059
ExcludeDNSRecordTypes: []string{"TXT"},
6160
TXTCacheInterval: 60,
6261
},
63-
provider: &MockProvider{},
62+
provider: &fakeprovider.MockProvider{},
6463
wantErr: false,
6564
wantType: "DynamoDBRegistry",
6665
},
@@ -69,7 +68,7 @@ func TestSelectRegistry(t *testing.T) {
6968
cfg: &externaldns.Config{
7069
Registry: "noop",
7170
},
72-
provider: &MockProvider{},
71+
provider: &fakeprovider.MockProvider{},
7372
wantErr: false,
7473
wantType: "NoopRegistry",
7574
},
@@ -84,7 +83,7 @@ func TestSelectRegistry(t *testing.T) {
8483
ManagedDNSRecordTypes: []string{"A", "CNAME"},
8584
ExcludeDNSRecordTypes: []string{"TXT"},
8685
},
87-
provider: &MockProvider{},
86+
provider: &fakeprovider.MockProvider{},
8887
wantErr: false,
8988
wantType: "TXTRegistry",
9089
},
@@ -94,7 +93,7 @@ func TestSelectRegistry(t *testing.T) {
9493
Registry: "aws-sd",
9594
TXTOwnerID: "owner-id",
9695
},
97-
provider: &MockProvider{},
96+
provider: &fakeprovider.MockProvider{},
9897
wantErr: false,
9998
wantType: "AWSSDRegistry",
10099
},
@@ -103,7 +102,7 @@ func TestSelectRegistry(t *testing.T) {
103102
cfg: &externaldns.Config{
104103
Registry: "unknown",
105104
},
106-
provider: &MockProvider{},
105+
provider: &fakeprovider.MockProvider{},
107106
wantErr: true,
108107
wantType: "",
109108
},
@@ -477,21 +476,47 @@ func TestBuildSource(t *testing.T) {
477476
}
478477
}
479478

480-
// mocks
481-
type MockProvider struct{}
482-
483-
func (m *MockProvider) Records(ctx context.Context) ([]*endpoint.Endpoint, error) {
484-
return nil, nil
485-
}
486-
487-
func (p *MockProvider) ApplyChanges(ctx context.Context, changes *plan.Changes) error {
488-
return nil
489-
}
479+
func TestBuildSourceWithWrappers(t *testing.T) {
480+
svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
481+
w.WriteHeader(http.StatusNotImplemented)
482+
}))
483+
defer svr.Close()
490484

491-
func (m *MockProvider) AdjustEndpoints(endpoints []*endpoint.Endpoint) ([]*endpoint.Endpoint, error) {
492-
return nil, nil
493-
}
485+
tests := []struct {
486+
name string
487+
cfg *externaldns.Config
488+
asserts func(*externaldns.Config)
489+
}{
490+
{
491+
name: "configuration with target filter wrapper",
492+
cfg: &externaldns.Config{
493+
APIServerURL: svr.URL,
494+
Sources: []string{"fake"},
495+
TargetNetFilter: []string{"10.0.0.0/8"},
496+
},
497+
asserts: func(cfg *externaldns.Config) {
498+
assert.True(t, cfg.IsSourceWrapperInstrumented("target-filter"))
499+
},
500+
},
501+
{
502+
name: "configuration without target filter wrapper",
503+
cfg: &externaldns.Config{
504+
APIServerURL: svr.URL,
505+
Sources: []string{"fake"},
506+
},
507+
asserts: func(cfg *externaldns.Config) {
508+
assert.True(t, cfg.IsSourceWrapperInstrumented("dedup"))
509+
assert.True(t, cfg.IsSourceWrapperInstrumented("nat64"))
510+
assert.False(t, cfg.IsSourceWrapperInstrumented("target-filter"))
511+
},
512+
},
513+
}
494514

495-
func (m *MockProvider) GetDomainFilter() endpoint.DomainFilterInterface {
496-
return nil
515+
for _, tt := range tests {
516+
t.Run(tt.name, func(t *testing.T) {
517+
_, err := buildSource(t.Context(), tt.cfg)
518+
require.NoError(t, err)
519+
tt.asserts(tt.cfg)
520+
})
521+
}
497522
}
Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
# 🧩 Source Wrappers/Middleware
2+
3+
## Overview
4+
5+
In ExternalDNS, a **Source** is a component responsible for discovering DNS records from Kubernetes resources (e.g., `Ingress`, `Service`, `Gateway`, etc.).
6+
7+
**Source Wrappers** are middleware-like components that sit between the source and the plan generation. They extend or modify the behavior of the original sources by transforming, filtering, or enriching the DNS records before they're processed by the planner and provider.
8+
9+
---
10+
11+
## Why Wrappers?
12+
13+
Wrappers solve these key challenges:
14+
15+
- ✂️ **Filtering**: Remove unwanted targets or records from sources based on labels, annotations, targets and etc.
16+
- 🔗 **Aggregation**: Combine Endpoints from multiple underlying sources. For example, from both Kubernetes Services and Ingresses.
17+
- 🧹 **Deduplication**: Prevent duplicate DNS records across sources.
18+
- 🌐 **Target transformation**: Rewrite targets for IPv6 networks or alter endpoint attributes like FQDNS or targets.
19+
- 🧪 **Testing and simulation**: Use the `FakeSource` or wrappers for dry-runs or simulations.
20+
- 🔁 **Composability**: Chain multiple behaviors without modifying core sources.
21+
- 🔐 **Access Control**: Limits endpoint exposure based on policies or user access.
22+
- 📊 **Observability**: Adds logging, debugging, or metrics around source behavior.
23+
24+
---
25+
26+
## Built In Wrappers
27+
28+
| Wrapper | Purpose | Use Case |
29+
|:--------------------:|:----------------------------------------|:--------------------------------------|
30+
| `MultiSource` | Combine multiple sources. | Aggregate `Ingress`, `Service`, etc. |
31+
| `DedupSource` | Remove duplicate DNS records. | Avoid duplicate records from sources. |
32+
| `TargetFilterSource` | Include/exclude targets based on CIDRs. | Exclude internal IPs. |
33+
| `NAT64Source` | Add NAT64-prefixed AAAA records. | Support IPv6 with NAT64. |
34+
35+
### Use Cases
36+
37+
### 1.1 `TargetFilterSource`
38+
39+
Filters targets (e.g. IPs or hostnames) based on inclusion or exclusion rules.
40+
41+
📌 **Use case**: Only publish public IPs, exclude test environments.
42+
43+
```yaml
44+
--target-net-filter=192.168.0.0/16
45+
--exclude-target-nets=10.0.0.0/8
46+
```
47+
48+
### 2.1 `NAT64Source`
49+
50+
Converts IPv4 targets to IPv6 using NAT64 prefixes.
51+
52+
📌 **Use case**: Publish AAAA records for IPv6-only clients in NAT64 environments.
53+
54+
```yaml
55+
--nat64-prefix=64:ff9b::/96
56+
```
57+
58+
---
59+
60+
## How Wrappers Work
61+
62+
Wrappers wrap a `Source` and implement the same `Source` interface (e.g., `Endpoints(ctx)`).
63+
64+
They typically follow this pattern:
65+
66+
```go
67+
package wrappers
68+
69+
type myWrapper struct {
70+
next source.Source
71+
}
72+
73+
func (m *myWrapper) Endpoints(ctx context.Context) ([]*endpoint.Endpoint, error) {
74+
eps, err := m.next.Endpoints(ctx)
75+
if err != nil {
76+
return nil, err
77+
}
78+
79+
// Modify, filter, or enrich endpoints as needed
80+
return eps, nil
81+
}
82+
83+
// AddEventHandler must be implemented to satisfy the source.Source interface.
84+
func (m *myWrapper) AddEventHandler(ctx context.Context, handler func()) {
85+
log.Debugf("myWrapper: adding event handler")
86+
m.next.AddEventHandler(ctx, handler)
87+
}
88+
```
89+
90+
This allows wrappers to be stacked or composed together.
91+
92+
---
93+
94+
### Composition of Wrappers
95+
96+
Wrappers are often composed like this:
97+
98+
```go
99+
source := NewMultiSource(actualSources, defaultTargets)
100+
source = NewDedupSource(source)
101+
source = NewNAT64Source(source, cfg.NAT64Networks)
102+
source = NewTargetFilterSource(source, targetFilter)
103+
```
104+
105+
Each wrapper processes the output of the previous one.
106+
107+
---
108+
109+
## High Level Design
110+
111+
- Source: Implements the base logic for extracting DNS endpoints (e.g. IngressSource, ServiceSource, etc.)
112+
- Wrappers: Decorate the source (e.g. DedupSource, TargetFilterSource) to enhance or filter endpoint data
113+
- Plan: Compares the endpoints from Source with DNS state from Provider and produces create/update/delete changes
114+
- Provider: Applies changes to actual DNS services (e.g. Route53, Cloudflare, Azure DNS)
115+
116+
```mermaid
117+
sequenceDiagram
118+
participant ExternalDNS
119+
participant Source
120+
participant Wrapper
121+
participant DedupWrapper as DedupSource
122+
participant Provider
123+
participant Plan
124+
125+
ExternalDNS->>Source: Initialize source (e.g. Ingress, Service)
126+
Source-->>ExternalDNS: Implements Source interface
127+
128+
ExternalDNS->>Wrapper: Wrap with decorators (e.g. dedup, filters)
129+
Wrapper->>DedupWrapper: Compose with DedupSource
130+
DedupWrapper-->>Wrapper: Return enriched Source
131+
132+
Wrapper-->>ExternalDNS: Return final wrapped Source
133+
134+
ExternalDNS->>Plan: Generate plan from Source
135+
Plan->>Wrapper: Call Endpoints(ctx)
136+
Wrapper->>DedupWrapper: Call Endpoints(ctx)
137+
DedupWrapper->>Source: Call Endpoints(ctx)
138+
Source-->>DedupWrapper: Return []*Endpoint
139+
DedupWrapper-->>Wrapper: Return de-duplicated []*Endpoint
140+
Wrapper-->>Plan: Return transformed []*Endpoint
141+
142+
ExternalDNS->>Provider: ApplyChanges(plan)
143+
Provider-->>ExternalDNS: Sync DNS records
144+
```
145+
146+
## Learn More
147+
148+
- [Source Interface](https://github.com/kubernetes-sigs/external-dns/blob/master/source/source.go)
149+
- [Wrappers Source Code](https://github.com/kubernetes-sigs/external-dns/tree/master/source/wrappers)

internal/testutils/mock_source.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,20 @@ import (
2828
// MockSource returns mock endpoints.
2929
type MockSource struct {
3030
mock.Mock
31+
endpoints []*endpoint.Endpoint
32+
}
33+
34+
func NewMockSource(endpoints ...*endpoint.Endpoint) *MockSource {
35+
m := &MockSource{
36+
endpoints: endpoints,
37+
}
38+
m.On("Endpoints").Return(endpoints, nil)
39+
m.On("AddEventHandler", mock.AnythingOfType("*context.cancelCtx")).Return()
40+
return m
3141
}
3242

3343
// Endpoints returns the desired mock endpoints.
34-
func (m *MockSource) Endpoints(ctx context.Context) ([]*endpoint.Endpoint, error) {
44+
func (m *MockSource) Endpoints(_ context.Context) ([]*endpoint.Endpoint, error) {
3545
args := m.Called()
3646

3747
endpoints := args.Get(0)
@@ -44,6 +54,10 @@ func (m *MockSource) Endpoints(ctx context.Context) ([]*endpoint.Endpoint, error
4454

4555
// AddEventHandler adds an event handler that should be triggered if something in source changes
4656
func (m *MockSource) AddEventHandler(ctx context.Context, handler func()) {
57+
m.Called(ctx)
58+
if handler == nil {
59+
return
60+
}
4761
go func() {
4862
ticker := time.NewTicker(5 * time.Second)
4963
defer ticker.Stop()

pkg/apis/externaldns/types.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,7 @@ type Config struct {
213213
NAT64Networks []string
214214
ExcludeUnschedulable bool
215215
ForceDefaultTargets bool
216+
sourceWrappers map[string]bool // map of source wrappers, e.g. "targetfilter", "nat64"
216217
}
217218

218219
var defaultConfig = &Config{
@@ -376,6 +377,7 @@ var defaultConfig = &Config{
376377
WebhookServer: false,
377378
ZoneIDFilter: []string{},
378379
ForceDefaultTargets: false,
380+
sourceWrappers: map[string]bool{},
379381
}
380382

381383
// NewConfig returns new Config object
@@ -427,6 +429,22 @@ func (cfg *Config) ParseFlags(args []string) error {
427429
return nil
428430
}
429431

432+
func (cfg *Config) AddSourceWrapper(name string) {
433+
if cfg.sourceWrappers == nil {
434+
cfg.sourceWrappers = make(map[string]bool)
435+
}
436+
cfg.sourceWrappers[name] = true
437+
}
438+
439+
// IsSourceWrapperInstrumented returns whether a source wrapper is enabled or not.
440+
func (cfg *Config) IsSourceWrapperInstrumented(name string) bool {
441+
if cfg.sourceWrappers == nil {
442+
return false
443+
}
444+
_, ok := cfg.sourceWrappers[name]
445+
return ok
446+
}
447+
430448
func App(cfg *Config) *kingpin.Application {
431449
app := kingpin.New("external-dns", "ExternalDNS synchronizes exposed Kubernetes Services and Ingresses with DNS providers.\n\nNote that all flags may be replaced with env vars - `--flag` -> `EXTERNAL_DNS_FLAG=1` or `--flag value` -> `EXTERNAL_DNS_FLAG=value`")
432450
app.Version(Version)

0 commit comments

Comments
 (0)