diff --git a/DESIGN.md b/DESIGN.md index 7d1a09e..bf97b92 100644 --- a/DESIGN.md +++ b/DESIGN.md @@ -17,10 +17,11 @@ stateDiagram-v2 Handler --> Translator Translator --> Handler Handler --> Synchronizer : "nkl-synchronizer queue" - Synchronizer --> NGINXPlusLB1 - Synchronizer --> NGINXPlusLB2 - Synchronizer --> NGINXPlusLB... - Synchronizer --> NGINXPlusLBn + Synchronizer --> BorderClient : "HttpBorderClient | TcpBorderClient" + BorderClient --> NGINXPlusLB1 + BorderClient --> NGINXPlusLB2 + BorderClient --> NGINXPlusLB... + BorderClient --> NGINXPlusLBn ``` ### Settings diff --git a/internal/application/application_common_test.go b/internal/application/application_common_test.go new file mode 100644 index 0000000..e963d03 --- /dev/null +++ b/internal/application/application_common_test.go @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2023 F5 Inc. All rights reserved. + * Use of this source code is governed by the Apache License that can be found in the LICENSE file. + */ + +package application + +import ( + "errors" + "github.com/nginxinc/kubernetes-nginx-ingress/internal/core" + "github.com/nginxinc/kubernetes-nginx-ingress/test/mocks" +) + +const ( + deletedEventType = core.Deleted + createEventType = core.Created + upstreamName = "upstreamName" + server = "server" +) + +func buildTerrorizingBorderClient(clientType string) (Interface, *mocks.MockNginxClient, error) { + nginxClient := mocks.NewErroringMockClient(errors.New(`something went horribly horribly wrong`)) + bc, err := NewBorderClient(clientType, nginxClient) + + return bc, nginxClient, err +} + +func buildBorderClient(clientType string) (Interface, *mocks.MockNginxClient, error) { + nginxClient := mocks.NewMockNginxClient() + bc, err := NewBorderClient(clientType, nginxClient) + + return bc, nginxClient, err +} + +func buildServerUpdateEvent(eventType core.EventType, clientType string) *core.ServerUpdateEvent { + upstreamServers := core.UpstreamServers{ + { + Host: server, + }, + } + + return core.NewServerUpdateEvent(eventType, upstreamName, clientType, upstreamServers) +} diff --git a/internal/application/application_constants.go b/internal/application/application_constants.go new file mode 100644 index 0000000..fa5cd42 --- /dev/null +++ b/internal/application/application_constants.go @@ -0,0 +1,11 @@ +/* + * Copyright (c) 2023 F5 Inc. All rights reserved. + * Use of this source code is governed by the Apache License that can be found in the LICENSE file. + */ + +package application + +const ( + ClientTypeTcp = "tcp" + ClientTypeHttp = "http" +) diff --git a/internal/application/border_client.go b/internal/application/border_client.go new file mode 100644 index 0000000..ed971b2 --- /dev/null +++ b/internal/application/border_client.go @@ -0,0 +1,37 @@ +/* + * Copyright 2023 F5 Inc. All rights reserved. + * Use of this source code is governed by the Apache License that can be found in the LICENSE file. + */ + +package application + +import ( + "fmt" + "github.com/nginxinc/kubernetes-nginx-ingress/internal/core" + "github.com/sirupsen/logrus" +) + +type Interface interface { + Update(*core.ServerUpdateEvent) error + Delete(*core.ServerUpdateEvent) error +} + +type BorderClient struct { +} + +// NewBorderClient Returns a NullBorderClient if the type is unknown, this avoids panics due to nil pointer dereferences. +func NewBorderClient(clientType string, borderClient interface{}) (Interface, error) { + logrus.Debugf(`NewBorderClient for type: %s`, clientType) + + switch clientType { + case "tcp": + return NewTcpBorderClient(borderClient) + + case "http": + return NewHttpBorderClient(borderClient) + + default: + borderClient, _ := NewNullBorderClient() + return borderClient, fmt.Errorf(`unknown border client type: %s`, clientType) + } +} diff --git a/internal/application/border_client_test.go b/internal/application/border_client_test.go new file mode 100644 index 0000000..6d5ab50 --- /dev/null +++ b/internal/application/border_client_test.go @@ -0,0 +1,52 @@ +/* + * Copyright 2023 F5 Inc. All rights reserved. + * Use of this source code is governed by the Apache License that can be found in the LICENSE file. + */ + +package application + +import ( + "github.com/nginxinc/kubernetes-nginx-ingress/test/mocks" + "testing" +) + +func TestBorderClient_CreatesHttpBorderClient(t *testing.T) { + borderClient := mocks.MockNginxClient{} + client, err := NewBorderClient("http", borderClient) + if err != nil { + t.Errorf(`error creating border client: %v`, err) + } + + if _, ok := client.(*HttpBorderClient); !ok { + t.Errorf(`expected client to be of type HttpBorderClient`) + } +} + +func TestBorderClient_CreatesTcpBorderClient(t *testing.T) { + borderClient := mocks.MockNginxClient{} + client, err := NewBorderClient("tcp", borderClient) + if err != nil { + t.Errorf(`error creating border client: %v`, err) + } + + if _, ok := client.(*TcpBorderClient); !ok { + t.Errorf(`expected client to be of type TcpBorderClient`) + } +} + +func TestBorderClient_UnknownClientType(t *testing.T) { + unknownClientType := "unknown" + borderClient := mocks.MockNginxClient{} + client, err := NewBorderClient(unknownClientType, borderClient) + if err == nil { + t.Errorf(`expected error creating border client`) + } + + if err.Error() != `unknown border client type: unknown` { + t.Errorf(`expected error to be 'unknown border client type: unknown', got: %v`, err) + } + + if _, ok := client.(*NullBorderClient); !ok { + t.Errorf(`expected client to be of type NullBorderClient`) + } +} diff --git a/internal/application/doc.go b/internal/application/doc.go new file mode 100644 index 0000000..3254255 --- /dev/null +++ b/internal/application/doc.go @@ -0,0 +1,19 @@ +/* + * Copyright 2023 F5 Inc. All rights reserved. + * Use of this source code is governed by the Apache License that can be found in the LICENSE file. + */ + +/* +Package application includes support for applying updates to the Border servers. + +"Border TcpServers" are the servers that are exposed to the outside world and direct traffic into the cluster. +At this time the only supported Border TcpServers are NGINX Plus servers. The BorderClient module defines +an interface that can be implemented to support other Border Server types. + +- HttpBorderClient: updates NGINX Plus servers using HTTP Upstream methods on the NGINX Plus API. +- TcpBorderClient: updates NGINX Plus servers using Stream Upstream methods on the NGINX Plus API. + +Selection of the appropriate client is based on the Annotations present on the NodePort Service definition. +*/ + +package application diff --git a/internal/application/http_border_client.go b/internal/application/http_border_client.go new file mode 100644 index 0000000..dc7d00a --- /dev/null +++ b/internal/application/http_border_client.go @@ -0,0 +1,63 @@ +/* + * Copyright 2023 F5 Inc. All rights reserved. + * Use of this source code is governed by the Apache License that can be found in the LICENSE file. + */ + +package application + +import ( + "fmt" + "github.com/nginxinc/kubernetes-nginx-ingress/internal/core" + nginxClient "github.com/nginxinc/nginx-plus-go-client/client" +) + +type HttpBorderClient struct { + BorderClient + nginxClient NginxClientInterface +} + +func NewHttpBorderClient(client interface{}) (Interface, error) { + ngxClient, ok := client.(NginxClientInterface) + if !ok { + return nil, fmt.Errorf(`expected a NginxClientInterface, got a %v`, client) + } + + return &HttpBorderClient{ + nginxClient: ngxClient, + }, nil +} + +func (hbc *HttpBorderClient) Update(event *core.ServerUpdateEvent) error { + httpUpstreamServers := asNginxHttpUpstreamServers(event.UpstreamServers) + _, _, _, err := hbc.nginxClient.UpdateHTTPServers(event.UpstreamName, httpUpstreamServers) + if err != nil { + return fmt.Errorf(`error occurred updating the nginx+ upstream server: %w`, err) + } + + return nil +} + +func (hbc *HttpBorderClient) Delete(event *core.ServerUpdateEvent) error { + err := hbc.nginxClient.DeleteHTTPServer(event.UpstreamName, event.UpstreamServers[0].Host) + if err != nil { + return fmt.Errorf(`error occurred deleting the nginx+ upstream server: %w`, err) + } + + return nil +} + +func asNginxHttpUpstreamServer(server *core.UpstreamServer) nginxClient.UpstreamServer { + return nginxClient.UpstreamServer{ + Server: server.Host, + } +} + +func asNginxHttpUpstreamServers(servers core.UpstreamServers) []nginxClient.UpstreamServer { + var upstreamServers []nginxClient.UpstreamServer + + for _, server := range servers { + upstreamServers = append(upstreamServers, asNginxHttpUpstreamServer(server)) + } + + return upstreamServers +} diff --git a/internal/application/http_border_client_test.go b/internal/application/http_border_client_test.go new file mode 100644 index 0000000..40a5fe1 --- /dev/null +++ b/internal/application/http_border_client_test.go @@ -0,0 +1,80 @@ +/* + * Copyright 2023 F5 Inc. All rights reserved. + * Use of this source code is governed by the Apache License that can be found in the LICENSE file. + */ + +package application + +import ( + "testing" +) + +func TestHttpBorderClient_Delete(t *testing.T) { + event := buildServerUpdateEvent(deletedEventType, ClientTypeHttp) + borderClient, nginxClient, err := buildBorderClient(ClientTypeHttp) + if err != nil { + t.Fatalf(`error occurred creating a new border client: %v`, err) + } + + err = borderClient.Delete(event) + if err != nil { + t.Fatalf(`error occurred deleting the nginx+ upstream server: %v`, err) + } + + if !nginxClient.CalledFunctions["DeleteHTTPServer"] { + t.Fatalf(`expected DeleteHTTPServer to be called`) + } +} + +func TestHttpBorderClient_Update(t *testing.T) { + event := buildServerUpdateEvent(createEventType, ClientTypeHttp) + borderClient, nginxClient, err := buildBorderClient(ClientTypeHttp) + if err != nil { + t.Fatalf(`error occurred creating a new border client: %v`, err) + } + + err = borderClient.Update(event) + if err != nil { + t.Fatalf(`error occurred deleting the nginx+ upstream server: %v`, err) + } + + if !nginxClient.CalledFunctions["UpdateHTTPServers"] { + t.Fatalf(`expected UpdateHTTPServers to be called`) + } +} + +func TestHttpBorderClient_BadNginxClient(t *testing.T) { + var emptyInterface interface{} + _, err := NewBorderClient(ClientTypeHttp, emptyInterface) + if err == nil { + t.Fatalf(`expected an error to occur when creating a new border client`) + } +} + +func TestHttpBorderClient_DeleteReturnsError(t *testing.T) { + event := buildServerUpdateEvent(deletedEventType, ClientTypeHttp) + borderClient, _, err := buildTerrorizingBorderClient(ClientTypeHttp) + if err != nil { + t.Fatalf(`error occurred creating a new border client: %v`, err) + } + + err = borderClient.Delete(event) + + if err == nil { + t.Fatalf(`expected an error to occur when deleting the nginx+ upstream server`) + } +} + +func TestHttpBorderClient_UpdateReturnsError(t *testing.T) { + event := buildServerUpdateEvent(createEventType, ClientTypeHttp) + borderClient, _, err := buildTerrorizingBorderClient(ClientTypeHttp) + if err != nil { + t.Fatalf(`error occurred creating a new border client: %v`, err) + } + + err = borderClient.Update(event) + + if err == nil { + t.Fatalf(`expected an error to occur when deleting the nginx+ upstream server`) + } +} diff --git a/internal/application/nginx_client_interface.go b/internal/application/nginx_client_interface.go new file mode 100644 index 0000000..5ca986d --- /dev/null +++ b/internal/application/nginx_client_interface.go @@ -0,0 +1,16 @@ +/* + * Copyright (c) 2023 F5 Inc. All rights reserved. + * Use of this source code is governed by the Apache License that can be found in the LICENSE file. + */ + +package application + +import nginxClient "github.com/nginxinc/nginx-plus-go-client/client" + +type NginxClientInterface interface { + DeleteStreamServer(upstream string, server string) error + UpdateStreamServers(upstream string, servers []nginxClient.StreamUpstreamServer) ([]nginxClient.StreamUpstreamServer, []nginxClient.StreamUpstreamServer, []nginxClient.StreamUpstreamServer, error) + + DeleteHTTPServer(upstream string, server string) error + UpdateHTTPServers(upstream string, servers []nginxClient.UpstreamServer) ([]nginxClient.UpstreamServer, []nginxClient.UpstreamServer, []nginxClient.UpstreamServer, error) +} diff --git a/internal/application/null_border_client.go b/internal/application/null_border_client.go new file mode 100644 index 0000000..f118068 --- /dev/null +++ b/internal/application/null_border_client.go @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2023 F5 Inc. All rights reserved. + * Use of this source code is governed by the Apache License that can be found in the LICENSE file. + */ + +package application + +import ( + "github.com/nginxinc/kubernetes-nginx-ingress/internal/core" + "github.com/sirupsen/logrus" +) + +type NullBorderClient struct { +} + +func NewNullBorderClient() (Interface, error) { + return &NullBorderClient{}, nil +} + +func (nbc *NullBorderClient) Update(_ *core.ServerUpdateEvent) error { + logrus.Warn("NullBorderClient.Update called") + return nil +} + +func (nbc *NullBorderClient) Delete(_ *core.ServerUpdateEvent) error { + logrus.Warn("NullBorderClient.Delete called") + return nil +} diff --git a/internal/application/null_border_client_test.go b/internal/application/null_border_client_test.go new file mode 100644 index 0000000..42e9dfb --- /dev/null +++ b/internal/application/null_border_client_test.go @@ -0,0 +1,24 @@ +/* + * Copyright (c) 2023 F5 Inc. All rights reserved. + * Use of this source code is governed by the Apache License that can be found in the LICENSE file. + */ + +package application + +import "testing" + +func TestNullBorderClient_Delete(t *testing.T) { + client := NullBorderClient{} + err := client.Delete(nil) + if err != nil { + t.Errorf(`expected no error deleting border client, got: %v`, err) + } +} + +func TestNullBorderClient_Update(t *testing.T) { + client := NullBorderClient{} + err := client.Update(nil) + if err != nil { + t.Errorf(`expected no error updating border client, got: %v`, err) + } +} diff --git a/internal/application/tcp_border_client.go b/internal/application/tcp_border_client.go new file mode 100644 index 0000000..bef0bc6 --- /dev/null +++ b/internal/application/tcp_border_client.go @@ -0,0 +1,63 @@ +/* + * Copyright 2023 F5 Inc. All rights reserved. + * Use of this source code is governed by the Apache License that can be found in the LICENSE file. + */ + +package application + +import ( + "fmt" + "github.com/nginxinc/kubernetes-nginx-ingress/internal/core" + nginxClient "github.com/nginxinc/nginx-plus-go-client/client" +) + +type TcpBorderClient struct { + BorderClient + nginxClient NginxClientInterface +} + +func NewTcpBorderClient(client interface{}) (Interface, error) { + ngxClient, ok := client.(NginxClientInterface) + if !ok { + return nil, fmt.Errorf(`expected a NginxClientInterface, got a %v`, client) + } + + return &TcpBorderClient{ + nginxClient: ngxClient, + }, nil +} + +func (tbc *TcpBorderClient) Update(event *core.ServerUpdateEvent) error { + streamUpstreamServers := asNginxStreamUpstreamServers(event.UpstreamServers) + _, _, _, err := tbc.nginxClient.UpdateStreamServers(event.UpstreamName, streamUpstreamServers) + if err != nil { + return fmt.Errorf(`error occurred updating the nginx+ upstream server: %w`, err) + } + + return nil +} + +func (tbc *TcpBorderClient) Delete(event *core.ServerUpdateEvent) error { + err := tbc.nginxClient.DeleteStreamServer(event.UpstreamName, event.UpstreamServers[0].Host) + if err != nil { + return fmt.Errorf(`error occurred deleting the nginx+ upstream server: %w`, err) + } + + return nil +} + +func asNginxStreamUpstreamServer(server *core.UpstreamServer) nginxClient.StreamUpstreamServer { + return nginxClient.StreamUpstreamServer{ + Server: server.Host, + } +} + +func asNginxStreamUpstreamServers(servers core.UpstreamServers) []nginxClient.StreamUpstreamServer { + var upstreamServers []nginxClient.StreamUpstreamServer + + for _, server := range servers { + upstreamServers = append(upstreamServers, asNginxStreamUpstreamServer(server)) + } + + return upstreamServers +} diff --git a/internal/application/tcp_border_client_test.go b/internal/application/tcp_border_client_test.go new file mode 100644 index 0000000..69087e5 --- /dev/null +++ b/internal/application/tcp_border_client_test.go @@ -0,0 +1,80 @@ +/* + * Copyright 2023 F5 Inc. All rights reserved. + * Use of this source code is governed by the Apache License that can be found in the LICENSE file. + */ + +package application + +import ( + "testing" +) + +func TestTcpBorderClient_Delete(t *testing.T) { + event := buildServerUpdateEvent(deletedEventType, ClientTypeTcp) + borderClient, nginxClient, err := buildBorderClient(ClientTypeTcp) + if err != nil { + t.Fatalf(`error occurred creating a new border client: %v`, err) + } + + err = borderClient.Delete(event) + if err != nil { + t.Fatalf(`error occurred deleting the nginx+ upstream server: %v`, err) + } + + if !nginxClient.CalledFunctions["DeleteStreamServer"] { + t.Fatalf(`expected DeleteStreamServer to be called`) + } +} + +func TestTcpBorderClient_Update(t *testing.T) { + event := buildServerUpdateEvent(createEventType, ClientTypeTcp) + borderClient, nginxClient, err := buildBorderClient(ClientTypeTcp) + if err != nil { + t.Fatalf(`error occurred creating a new border client: %v`, err) + } + + err = borderClient.Update(event) + if err != nil { + t.Fatalf(`error occurred deleting the nginx+ upstream server: %v`, err) + } + + if !nginxClient.CalledFunctions["UpdateStreamServers"] { + t.Fatalf(`expected UpdateStreamServers to be called`) + } +} + +func TestTcpBorderClient_BadNginxClient(t *testing.T) { + var emptyInterface interface{} + _, err := NewBorderClient(ClientTypeTcp, emptyInterface) + if err == nil { + t.Fatalf(`expected an error to occur when creating a new border client`) + } +} + +func TestTcpBorderClient_DeleteReturnsError(t *testing.T) { + event := buildServerUpdateEvent(deletedEventType, ClientTypeTcp) + borderClient, _, err := buildTerrorizingBorderClient(ClientTypeTcp) + if err != nil { + t.Fatalf(`error occurred creating a new border client: %v`, err) + } + + err = borderClient.Delete(event) + + if err == nil { + t.Fatalf(`expected an error to occur when deleting the nginx+ upstream server`) + } +} + +func TestTcpBorderClient_UpdateReturnsError(t *testing.T) { + event := buildServerUpdateEvent(createEventType, ClientTypeTcp) + borderClient, _, err := buildTerrorizingBorderClient(ClientTypeTcp) + if err != nil { + t.Fatalf(`error occurred creating a new border client: %v`, err) + } + + err = borderClient.Update(event) + + if err == nil { + t.Fatalf(`expected an error to occur when deleting the nginx+ upstream server`) + } +} diff --git a/internal/configuration/settings.go b/internal/configuration/settings.go index bc1dc11..c394232 100644 --- a/internal/configuration/settings.go +++ b/internal/configuration/settings.go @@ -19,9 +19,10 @@ import ( ) const ( - ConfigMapsNamespace = "nkl" - ResyncPeriod = 0 - NklPrefix = ConfigMapsNamespace + "-" + ConfigMapsNamespace = "nkl" + ResyncPeriod = 0 + NklPrefix = ConfigMapsNamespace + "-" + PortAnnotationPrefix = "nginxinc.io" ) type WorkQueueSettings struct { diff --git a/internal/core/event.go b/internal/core/event.go new file mode 100644 index 0000000..b5609b1 --- /dev/null +++ b/internal/core/event.go @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2023 F5 Inc. All rights reserved. + * Use of this source code is governed by the Apache License that can be found in the LICENSE file. + */ + +package core + +import v1 "k8s.io/api/core/v1" + +type EventType int + +const ( + Created EventType = iota + Updated + Deleted +) + +type Event struct { + Type EventType + Service *v1.Service + PreviousService *v1.Service + NodeIps []string +} + +func NewEvent(eventType EventType, service *v1.Service, previousService *v1.Service, nodeIps []string) Event { + return Event{ + Type: eventType, + Service: service, + PreviousService: previousService, + NodeIps: nodeIps, + } +} diff --git a/internal/core/events.go b/internal/core/events.go deleted file mode 100644 index ecfbbc5..0000000 --- a/internal/core/events.go +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Copyright 2023 F5 Inc. All rights reserved. - * Use of this source code is governed by the Apache License that can be found in the LICENSE file. - */ - -package core - -import ( - nginxClient "github.com/nginxinc/nginx-plus-go-client/client" - v1 "k8s.io/api/core/v1" -) - -type EventType int - -const ( - Created EventType = iota - Updated - Deleted -) - -type Event struct { - Type EventType - Service *v1.Service - PreviousService *v1.Service - NodeIps []string -} - -type ServerUpdateEvent struct { - Id string - NginxHost string - Type EventType - UpstreamName string - Servers []nginxClient.StreamUpstreamServer -} - -type ServerUpdateEvents = []*ServerUpdateEvent - -func NewEvent(eventType EventType, service *v1.Service, previousService *v1.Service, nodeIps []string) Event { - return Event{ - Type: eventType, - Service: service, - PreviousService: previousService, - NodeIps: nodeIps, - } -} - -func NewServerUpdateEvent(eventType EventType, upstreamName string, servers []nginxClient.StreamUpstreamServer) *ServerUpdateEvent { - return &ServerUpdateEvent{ - Type: eventType, - UpstreamName: upstreamName, - Servers: servers, - } -} - -func ServerUpdateEventWithIdAndHost(event *ServerUpdateEvent, id string, nginxHost string) *ServerUpdateEvent { - return &ServerUpdateEvent{ - Id: id, - NginxHost: nginxHost, - Type: event.Type, - UpstreamName: event.UpstreamName, - Servers: event.Servers, - } -} - -func (e *ServerUpdateEvent) TypeName() string { - switch e.Type { - case Created: - return "Created" - case Updated: - return "Updated" - case Deleted: - return "Deleted" - default: - return "Unknown" - } -} diff --git a/internal/core/server_update_event.go b/internal/core/server_update_event.go new file mode 100644 index 0000000..dd02127 --- /dev/null +++ b/internal/core/server_update_event.go @@ -0,0 +1,50 @@ +/* + * Copyright 2023 F5 Inc. All rights reserved. + * Use of this source code is governed by the Apache License that can be found in the LICENSE file. + */ + +package core + +type ServerUpdateEvent struct { + ClientType string + Id string + NginxHost string + Type EventType + UpstreamName string + UpstreamServers UpstreamServers +} + +type ServerUpdateEvents = []*ServerUpdateEvent + +func NewServerUpdateEvent(eventType EventType, upstreamName string, clientType string, upstreamServers UpstreamServers) *ServerUpdateEvent { + return &ServerUpdateEvent{ + ClientType: clientType, + Type: eventType, + UpstreamName: upstreamName, + UpstreamServers: upstreamServers, + } +} + +func ServerUpdateEventWithIdAndHost(event *ServerUpdateEvent, id string, nginxHost string) *ServerUpdateEvent { + return &ServerUpdateEvent{ + ClientType: event.ClientType, + Id: id, + NginxHost: nginxHost, + Type: event.Type, + UpstreamName: event.UpstreamName, + UpstreamServers: event.UpstreamServers, + } +} + +func (e *ServerUpdateEvent) TypeName() string { + switch e.Type { + case Created: + return "Created" + case Updated: + return "Updated" + case Deleted: + return "Deleted" + default: + return "Unknown" + } +} diff --git a/internal/core/events_test.go b/internal/core/server_update_event_test.go similarity index 66% rename from internal/core/events_test.go rename to internal/core/server_update_event_test.go index d46118f..a891e23 100644 --- a/internal/core/events_test.go +++ b/internal/core/server_update_event_test.go @@ -6,12 +6,15 @@ package core import ( - nginxClient "github.com/nginxinc/nginx-plus-go-client/client" "testing" ) +const clientType = "clientType" + +var emptyUpstreamServers UpstreamServers + func TestServerUpdateEventWithIdAndHost(t *testing.T) { - event := NewServerUpdateEvent(Created, "upstream", []nginxClient.StreamUpstreamServer{}) + event := NewServerUpdateEvent(Created, "upstream", clientType, emptyUpstreamServers) if event.Id != "" { t.Errorf("expected empty Id, got %s", event.Id) @@ -30,10 +33,14 @@ func TestServerUpdateEventWithIdAndHost(t *testing.T) { if eventWithIdAndHost.NginxHost != "host" { t.Errorf("expected NginxHost to be 'host', got %s", eventWithIdAndHost.NginxHost) } + + if eventWithIdAndHost.ClientType != clientType { + t.Errorf("expected ClientType to be '%s', got %s", clientType, eventWithIdAndHost.ClientType) + } } func TestTypeNameCreated(t *testing.T) { - event := NewServerUpdateEvent(Created, "upstream", []nginxClient.StreamUpstreamServer{}) + event := NewServerUpdateEvent(Created, "upstream", clientType, emptyUpstreamServers) if event.TypeName() != "Created" { t.Errorf("expected 'Created', got %s", event.TypeName()) @@ -41,7 +48,7 @@ func TestTypeNameCreated(t *testing.T) { } func TestTypeNameUpdated(t *testing.T) { - event := NewServerUpdateEvent(Updated, "upstream", []nginxClient.StreamUpstreamServer{}) + event := NewServerUpdateEvent(Updated, "upstream", clientType, emptyUpstreamServers) if event.TypeName() != "Updated" { t.Errorf("expected 'Updated', got %s", event.TypeName()) @@ -49,7 +56,7 @@ func TestTypeNameUpdated(t *testing.T) { } func TestTypeNameDeleted(t *testing.T) { - event := NewServerUpdateEvent(Deleted, "upstream", []nginxClient.StreamUpstreamServer{}) + event := NewServerUpdateEvent(Deleted, "upstream", clientType, emptyUpstreamServers) if event.TypeName() != "Deleted" { t.Errorf("expected 'Deleted', got %s", event.TypeName()) @@ -57,7 +64,7 @@ func TestTypeNameDeleted(t *testing.T) { } func TestTypeNameUnknown(t *testing.T) { - event := NewServerUpdateEvent(EventType(100), "upstream", []nginxClient.StreamUpstreamServer{}) + event := NewServerUpdateEvent(EventType(100), "upstream", clientType, emptyUpstreamServers) if event.TypeName() != "Unknown" { t.Errorf("expected 'Unknown', got %s", event.TypeName()) diff --git a/internal/core/upstream_server.go b/internal/core/upstream_server.go new file mode 100644 index 0000000..210b001 --- /dev/null +++ b/internal/core/upstream_server.go @@ -0,0 +1,18 @@ +/* + * Copyright (c) 2023 F5 Inc. All rights reserved. + * Use of this source code is governed by the Apache License that can be found in the LICENSE file. + */ + +package core + +type UpstreamServer struct { + Host string +} + +type UpstreamServers = []*UpstreamServer + +func NewUpstreamServer(host string) *UpstreamServer { + return &UpstreamServer{ + Host: host, + } +} diff --git a/internal/synchronization/synchronizer.go b/internal/synchronization/synchronizer.go index ecb0a62..2b78a3d 100644 --- a/internal/synchronization/synchronizer.go +++ b/internal/synchronization/synchronizer.go @@ -7,6 +7,7 @@ package synchronization import ( "fmt" + "github.com/nginxinc/kubernetes-nginx-ingress/internal/application" "github.com/nginxinc/kubernetes-nginx-ingress/internal/communication" "github.com/nginxinc/kubernetes-nginx-ingress/internal/configuration" "github.com/nginxinc/kubernetes-nginx-ingress/internal/core" @@ -79,8 +80,8 @@ func (s *Synchronizer) ShutDown() { s.eventQueue.ShutDownWithDrain() } -func (s *Synchronizer) buildNginxPlusClient(nginxHost string) (*nginxClient.NginxClient, error) { - logrus.Debugf(`Synchronizer::buildNginxPlusClient for host: %s`, nginxHost) +func (s *Synchronizer) buildBorderClient(event *core.ServerUpdateEvent) (application.Interface, error) { + logrus.Debugf(`Synchronizer::buildBorderClient`) var err error @@ -89,12 +90,12 @@ func (s *Synchronizer) buildNginxPlusClient(nginxHost string) (*nginxClient.Ngin return nil, fmt.Errorf(`error creating HTTP client: %v`, err) } - client, err := nginxClient.NewNginxClient(httpClient, nginxHost) + ngxClient, err := nginxClient.NewNginxClient(httpClient, event.NginxHost) if err != nil { return nil, fmt.Errorf(`error creating Nginx Plus client: %v`, err) } - return client, nil + return application.NewBorderClient(event.ClientType, ngxClient) } func (s *Synchronizer) fanOutEventToHosts(event core.ServerUpdateEvents) core.ServerUpdateEvents { @@ -145,14 +146,13 @@ func (s *Synchronizer) handleCreatedUpdatedEvent(serverUpdateEvent *core.ServerU var err error - client, err := s.buildNginxPlusClient(serverUpdateEvent.NginxHost) + borderClient, err := s.buildBorderClient(serverUpdateEvent) if err != nil { - return fmt.Errorf(`error occurred building the nginx+ client: %w`, err) + return fmt.Errorf(`error occurred creating the border client: %w`, err) } - _, _, _, err = client.UpdateStreamServers(serverUpdateEvent.UpstreamName, serverUpdateEvent.Servers) - if err != nil { - return fmt.Errorf(`error occurred updating the nginx+ upstream servers: %w`, err) + if err = borderClient.Update(serverUpdateEvent); err != nil { + return fmt.Errorf(`error occurred updating the %s upstream servers: %w`, serverUpdateEvent.ClientType, err) } return nil @@ -163,14 +163,13 @@ func (s *Synchronizer) handleDeletedEvent(serverUpdateEvent *core.ServerUpdateEv var err error - client, err := s.buildNginxPlusClient(serverUpdateEvent.NginxHost) + borderClient, err := s.buildBorderClient(serverUpdateEvent) if err != nil { - return fmt.Errorf(`error occurred building the nginx+ client: %w`, err) + return fmt.Errorf(`error occurred creating the border client: %w`, err) } - err = client.DeleteStreamServer(serverUpdateEvent.UpstreamName, serverUpdateEvent.Servers[0].Server) - if err != nil { - return fmt.Errorf(`error occurred deleting the nginx+ upstream server: %w`, err) + if err = borderClient.Delete(serverUpdateEvent); err != nil { + return fmt.Errorf(`error occurred deleting the %s upstream servers: %w`, serverUpdateEvent.ClientType, err) } return nil diff --git a/internal/synchronization/synchronizer_test.go b/internal/synchronization/synchronizer_test.go index 4c1911e..315def7 100644 --- a/internal/synchronization/synchronizer_test.go +++ b/internal/synchronization/synchronizer_test.go @@ -32,11 +32,11 @@ func TestSynchronizer_NewSynchronizer(t *testing.T) { func TestSynchronizer_AddEventNoHosts(t *testing.T) { const expectedEventCount = 0 event := &core.ServerUpdateEvent{ - Id: "", - NginxHost: "", - Type: 0, - UpstreamName: "", - Servers: nil, + Id: "", + NginxHost: "", + Type: 0, + UpstreamName: "", + UpstreamServers: nil, } settings, err := configuration.NewSettings(context.Background(), nil) rateLimiter := &mocks.MockRateLimiter{} @@ -188,11 +188,11 @@ func buildEvents(count int) core.ServerUpdateEvents { events := make(core.ServerUpdateEvents, count) for i := 0; i < count; i++ { events[i] = &core.ServerUpdateEvent{ - Id: fmt.Sprintf("id-%v", i), - NginxHost: "https://localhost:8080", - Type: 0, - UpstreamName: "", - Servers: nil, + Id: fmt.Sprintf("id-%v", i), + NginxHost: "https://localhost:8080", + Type: 0, + UpstreamName: "", + UpstreamServers: nil, } } return events diff --git a/internal/translation/translator.go b/internal/translation/translator.go index 0bd0e02..2169251 100644 --- a/internal/translation/translator.go +++ b/internal/translation/translator.go @@ -9,7 +9,6 @@ import ( "fmt" "github.com/nginxinc/kubernetes-nginx-ingress/internal/configuration" "github.com/nginxinc/kubernetes-nginx-ingress/internal/core" - nginxClient "github.com/nginxinc/nginx-plus-go-client/client" "github.com/sirupsen/logrus" v1 "k8s.io/api/core/v1" "strings" @@ -45,17 +44,21 @@ func buildServerUpdateEvents(ports []v1.ServicePort, event *core.Event) (core.Se events := core.ServerUpdateEvents{} for _, port := range ports { ingressName := fixIngressName(port.Name) - servers, _ := buildServers(event.NodeIps, port) + upstreamServers, _ := buildUpstreamServers(event.NodeIps, port) + clientType := getClientType(port.Name, event.Service.Annotations) switch event.Type { case core.Created: fallthrough + case core.Updated: - events = append(events, core.NewServerUpdateEvent(event.Type, ingressName, servers)) + events = append(events, core.NewServerUpdateEvent(event.Type, ingressName, clientType, upstreamServers)) + case core.Deleted: - for _, server := range servers { - events = append(events, core.NewServerUpdateEvent(event.Type, ingressName, []nginxClient.StreamUpstreamServer{server})) + for _, server := range upstreamServers { + events = append(events, core.NewServerUpdateEvent(event.Type, ingressName, clientType, core.UpstreamServers{server})) } + default: logrus.Warnf(`Translator::buildServerUpdateEvents: unknown event type: %d`, event.Type) } @@ -65,13 +68,12 @@ func buildServerUpdateEvents(ports []v1.ServicePort, event *core.Event) (core.Se return events, nil } -func buildServers(nodeIps []string, port v1.ServicePort) ([]nginxClient.StreamUpstreamServer, error) { - var servers []nginxClient.StreamUpstreamServer +func buildUpstreamServers(nodeIps []string, port v1.ServicePort) (core.UpstreamServers, error) { + var servers core.UpstreamServers for _, nodeIp := range nodeIps { - server := nginxClient.StreamUpstreamServer{ - Server: fmt.Sprintf("%s:%d", nodeIp, port.NodePort), - } + host := fmt.Sprintf("%s:%d", nodeIp, port.NodePort) + server := core.NewUpstreamServer(host) servers = append(servers, server) } @@ -81,3 +83,15 @@ func buildServers(nodeIps []string, port v1.ServicePort) ([]nginxClient.StreamUp func fixIngressName(name string) string { return name[4:] } + +func getClientType(portName string, annotations map[string]string) string { + key := fmt.Sprintf("%s/%s", configuration.PortAnnotationPrefix, portName) + logrus.Infof("getClientType: key=%s", key) + if annotations != nil { + if clientType, ok := annotations[key]; ok { + return clientType + } + } + + return "http" +} diff --git a/internal/translation/translator_test.go b/internal/translation/translator_test.go index bdf13b6..4e635d6 100644 --- a/internal/translation/translator_test.go +++ b/internal/translation/translator_test.go @@ -594,7 +594,7 @@ func TestDeletedTranslateManyMixedPortsAndManyNodes(t *testing.T) { func assertExpectedServerCount(t *testing.T, expectedCount int, events core.ServerUpdateEvents) { for _, translatedEvent := range events { - serverCount := len(translatedEvent.Servers) + serverCount := len(translatedEvent.UpstreamServers) if serverCount != expectedCount { t.Fatalf("expected %d servers, got %d", expectedCount, serverCount) } diff --git a/test/mocks/mock_nginx_plus_client.go b/test/mocks/mock_nginx_plus_client.go new file mode 100644 index 0000000..2c16e12 --- /dev/null +++ b/test/mocks/mock_nginx_plus_client.go @@ -0,0 +1,67 @@ +/* + * Copyright (c) 2023 F5 Inc. All rights reserved. + * Use of this source code is governed by the Apache License that can be found in the LICENSE file. + */ + +package mocks + +import nginxClient "github.com/nginxinc/nginx-plus-go-client/client" + +type MockNginxClient struct { + CalledFunctions map[string]bool + Error error +} + +func NewMockNginxClient() *MockNginxClient { + return &MockNginxClient{ + CalledFunctions: make(map[string]bool), + Error: nil, + } +} + +func NewErroringMockClient(err error) *MockNginxClient { + return &MockNginxClient{ + CalledFunctions: make(map[string]bool), + Error: err, + } +} + +func (m MockNginxClient) DeleteStreamServer(_ string, _ string) error { + m.CalledFunctions["DeleteStreamServer"] = true + + if m.Error != nil { + return m.Error + } + + return nil +} + +func (m MockNginxClient) UpdateStreamServers(_ string, _ []nginxClient.StreamUpstreamServer) ([]nginxClient.StreamUpstreamServer, []nginxClient.StreamUpstreamServer, []nginxClient.StreamUpstreamServer, error) { + m.CalledFunctions["UpdateStreamServers"] = true + + if m.Error != nil { + return nil, nil, nil, m.Error + } + + return nil, nil, nil, nil +} + +func (m MockNginxClient) DeleteHTTPServer(_ string, _ string) error { + m.CalledFunctions["DeleteHTTPServer"] = true + + if m.Error != nil { + return m.Error + } + + return nil +} + +func (m MockNginxClient) UpdateHTTPServers(_ string, _ []nginxClient.UpstreamServer) ([]nginxClient.UpstreamServer, []nginxClient.UpstreamServer, []nginxClient.UpstreamServer, error) { + m.CalledFunctions["UpdateHTTPServers"] = true + + if m.Error != nil { + return nil, nil, nil, m.Error + } + + return nil, nil, nil, nil +}