Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 101 additions & 36 deletions controller/api/destination/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ const podIP1 = "172.17.0.12"
const podIP1v6 = "2001:db8::68"
const podIPv6Dual = "2001:db8::94"
const podIP2 = "172.17.0.13"
const podIP3 = "172.17.0.20"
const podIP4 = "172.17.0.21"
const podIPOpaque = "172.17.0.14"
const podIPSkipped = "172.17.0.15"
const podIPPolicy = "172.17.0.16"
Expand All @@ -50,6 +52,7 @@ const externalIPv6 = "2001:db8::78"
const externalWorkloadIP = "200.1.1.1"
const externalWorkloadIPPolicy = "200.1.1.2"
const port uint32 = 8989
const linkerdAdminPort uint32 = 4191
const opaquePort uint32 = 4242
const skippedPort uint32 = 24224

Expand Down Expand Up @@ -613,6 +616,84 @@ func TestGetProfiles(t *testing.T) {
}
})

t.Run("Profile gets updated after pod becomes ready", func(t *testing.T) {
server := makeServer(t)

// podIP3 is initially not ready
stream := profileStream(t, server, podIP3, linkerdAdminPort, "ns:ns")
defer stream.Cancel()

updates := stream.Updates()
if len(updates) != 1 {
t.Fatalf("Expected 1 update but got %d: %v", len(updates), updates)
}

profile := updates[0]
if profile.Endpoint == nil {
t.Fatalf("Expected response to have endpoint field")
}
if profile.Endpoint.TlsIdentity != nil {
t.Fatalf("Expected endpoint TLS identity to be nil but was %v", profile.Endpoint.TlsIdentity)
}

pod, err := server.k8sAPI.Client.CoreV1().Pods("ns").Get(context.Background(), "name1-20", metav1.GetOptions{})
if err != nil {
t.Fatalf("Failed to retrieve pod: %s", err)
}
pod.Status.Phase = corev1.PodRunning
_, err = server.k8sAPI.Client.CoreV1().Pods("ns").Update(context.Background(), pod, metav1.UpdateOptions{})
if err != nil {
t.Fatalf("Failed to update pod: %s", err)
}

profile = getLastProfileUpdate(t, stream, 2)
if profile.Endpoint == nil {
t.Fatalf("Expected response to have endpoint field")
}
if profile.Endpoint.TlsIdentity == nil {
t.Fatalf("Expected endpoint TLS identity to not be nil")
}
})

t.Run("Profile gets updated after pod becomes ready (native sidecar)", func(t *testing.T) {
server := makeServer(t)

// podIP3 is initially not ready
stream := profileStream(t, server, podIP4, linkerdAdminPort, "ns:ns")
defer stream.Cancel()

updates := stream.Updates()
if len(updates) != 1 {
t.Fatalf("Expected 1 update but got %d: %v", len(updates), updates)
}

profile := updates[0]
if profile.Endpoint == nil {
t.Fatalf("Expected response to have endpoint field")
}
if profile.Endpoint.TlsIdentity != nil {
t.Fatalf("Expected endpoint TLS identity to be nil but was %v", profile.Endpoint.TlsIdentity)
}

pod, err := server.k8sAPI.Client.CoreV1().Pods("ns").Get(context.Background(), "name1-21", metav1.GetOptions{})
if err != nil {
t.Fatalf("Failed to retrieve pod: %s", err)
}
pod.Status.Phase = corev1.PodRunning
_, err = server.k8sAPI.Client.CoreV1().Pods("ns").Update(context.Background(), pod, metav1.UpdateOptions{})
if err != nil {
t.Fatalf("Failed to update pod: %s", err)
}

profile = getLastProfileUpdate(t, stream, 2)
if profile.Endpoint == nil {
t.Fatalf("Expected response to have endpoint field")
}
if profile.Endpoint.TlsIdentity == nil {
t.Fatalf("Expected endpoint TLS identity to not be nil")
}
})

t.Run("Return profile with endpoint when using externalworkload IP", func(t *testing.T) {
server := makeServer(t)
http2Params := pb.Http2ClientParams{
Expand Down Expand Up @@ -952,17 +1033,7 @@ func TestGetProfiles(t *testing.T) {
if err != nil {
t.Fatalf("Failed to delete pod: %s", err)
}
err = testutil.RetryFor(time.Second*10, func() error {
updates := stream.Updates()
if len(updates) < 2 {
return fmt.Errorf("expected 2 updates, got %d", len(updates))
}
return nil
})
if err != nil {
t.Fatal(err)
}
profile = stream.Updates()[1]
profile = getLastProfileUpdate(t, stream, 2)
dstPod = profile.Endpoint.MetricLabels["pod"]
if dstPod != "" {
t.Fatalf("Expected no dst_pod but got %s", dstPod)
Expand Down Expand Up @@ -1027,18 +1098,7 @@ func TestGetProfiles(t *testing.T) {
t.Fatalf("Failed to create pod: %s", err)
}

err = testutil.RetryFor(time.Second*10, func() error {
updates := stream.Updates()
if len(updates) < 3 {
return fmt.Errorf("expected 3 updates, got %d", len(updates))
}
return nil
})
if err != nil {
t.Fatal(err)
}

profile = stream.Updates()[2]
profile = getLastProfileUpdate(t, stream, 3)
dstPod = profile.Endpoint.MetricLabels["pod"]
if dstPod != "hostport-mapping-2" {
t.Fatalf("Expected dst_pod to be %s got %s", "hostport-mapping-2", dstPod)
Expand Down Expand Up @@ -1067,19 +1127,7 @@ func TestGetProfiles(t *testing.T) {
},
}, metav1.CreateOptions{})

var updates []*pb.DestinationProfile
err = testutil.RetryFor(time.Second*10, func() error {
updates = stream.Updates()
if len(updates) < 4 {
return fmt.Errorf("expected 4 updates, got %d", len(updates))
}
return nil
})
if err != nil {
t.Fatal(err)
}

profile = stream.Updates()[3]
profile = getLastProfileUpdate(t, stream, 4)
if !profile.OpaqueProtocol {
t.Fatal("Expected OpaqueProtocol=true")
}
Expand Down Expand Up @@ -1316,3 +1364,20 @@ func profileStream(t *testing.T, server *server, host string, port uint32, token

return stream
}

func getLastProfileUpdate(t *testing.T, stream *bufferingGetProfileStream, expectedUpdates int) *pb.DestinationProfile {
t.Helper()

err := testutil.RetryFor(time.Second*10, func() error {
updates := stream.Updates()
if len(updates) < expectedUpdates {
return fmt.Errorf("expected %d updates, got %d", expectedUpdates, len(updates))
}
return nil
})
if err != nil {
t.Fatal(err)
}

return stream.Updates()[expectedUpdates-1]
}
56 changes: 56 additions & 0 deletions controller/api/destination/test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,62 @@ spec:
isRetryable: false
condition:
pathRegex: "/a/b/c"`,
`
apiVersion: v1
kind: Pod
metadata:
labels:
linkerd.io/control-plane-ns: linkerd
name: name1-20
namespace: ns
status:
phase: Pending
conditions:
- type: Ready
status: "False"
podIP: 172.17.0.20
podIPs:
- ip: 172.17.0.20
spec:
containers:
- env:
- name: LINKERD2_PROXY_INBOUND_LISTEN_ADDR
value: 0.0.0.0:4143
- name: LINKERD2_PROXY_ADMIN_LISTEN_ADDR
value: 0.0.0.0:4191
- name: LINKERD2_PROXY_CONTROL_LISTEN_ADDR
value: 0.0.0.0:4190
name: linkerd-proxy
ports:
- containerPort: 4191`,
`
apiVersion: v1
kind: Pod
metadata:
labels:
linkerd.io/control-plane-ns: linkerd
name: name1-21
namespace: ns
status:
phase: Pending
conditions:
- type: Ready
status: "False"
podIP: 172.17.0.21
podIPs:
- ip: 172.17.0.21
spec:
initContainers:
- env:
- name: LINKERD2_PROXY_INBOUND_LISTEN_ADDR
value: 0.0.0.0:4143
- name: LINKERD2_PROXY_ADMIN_LISTEN_ADDR
value: 0.0.0.0:4191
- name: LINKERD2_PROXY_CONTROL_LISTEN_ADDR
value: 0.0.0.0:4190
name: linkerd-proxy
ports:
- containerPort: 4191`,
}

clientSP := []string{
Expand Down
2 changes: 1 addition & 1 deletion controller/api/destination/watcher/workload_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ func (ww *WorkloadWatcher) submitPodUpdate(pod *corev1.Pod, remove bool) {
submitPod = nil
}

for _, container := range pod.Spec.Containers {
for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) {
for _, containerPort := range container.Ports {
if containerPort.ContainerPort != 0 {
for _, pip := range pod.Status.PodIPs {
Expand Down
Loading