Skip to content

Commit 2f22c34

Browse files
constanca-mdmathieu
authored andcommitted
[extension/azureauth] Add support to be used in receivers (open-telemetry#39033)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description For the extension to have support to be used in receivers, it needs to implement: - Interface `extensionauth.Server` so that it can check if requests have valid authentication. - Interface `azcore.TokenCredential` so the receivers can get a token if necessary (it will be for all azure receivers). <!-- Issue number (e.g. open-telemetry#1234) or full URL to issue, if applicable. --> #### Link to tracking issue Fixes open-telemetry#39012. <!--Describe what testing was performed and which tests were added.--> #### Testing I have added unit tests. <!--Describe the documentation added.--> #### Documentation Code comments and README file updated.
1 parent 159ab3a commit 2f22c34

File tree

11 files changed

+519
-21
lines changed

11 files changed

+519
-21
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: azureauthextension
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Implement extensionauth.Server and azcore.TokenCredential.
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [39012]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: []

extension/azureauthextension/README.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@
1010
[development]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/component-stability.md#development
1111
<!-- end autogenerated section -->
1212

13-
This extension implements both `extensionauth.Client` and `extensionauth.Server`, so it can be used in both exporters and receivers.
13+
This extension implements both `extensionauth.HTTPClient` and `extensionauth.Server`, so it can be used in both exporters and receivers.
14+
15+
Additionally, the extension also implements `azcore.TokenCredential` so that Azure components can get the token by running the function `GetToken`. If the component supports HTTP client, then this should not be necessary, as the token will be placed in the authorization header.
1416

1517
It supports 4 different types of authentication:
1618
- Managed identity for Azure resources
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package azureauthextension
5+
6+
import (
7+
"testing"
8+
"time"
9+
10+
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
11+
)
12+
13+
func BenchmarkGetCurrentToken(b *testing.B) {
14+
auth := &authenticator{}
15+
auth.token.Store(azcore.AccessToken{Token: "test", ExpiresOn: time.Now().Add(time.Hour)})
16+
17+
b.ResetTimer()
18+
for i := 0; i < b.N; i++ {
19+
_, _ = auth.getCurrentToken()
20+
}
21+
}
22+
23+
func BenchmarkGetCurrentTokenParallel(b *testing.B) {
24+
auth := &authenticator{}
25+
auth.token.Store(azcore.AccessToken{Token: "test", ExpiresOn: time.Now().Add(time.Hour)})
26+
27+
b.RunParallel(func(pb *testing.PB) {
28+
for pb.Next() {
29+
_, _ = auth.getCurrentToken()
30+
}
31+
})
32+
}

extension/azureauthextension/config.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ var (
2424
errEmptyFederatedTokenFile = errors.New(`empty "federated_token_file" field`)
2525
errEmptyAuthentication = fmt.Errorf("authentication configuration is empty, please choose one of %s", validOptions)
2626
errEmptyScope = errors.New(`the "scope" field for the token permissions is empty`)
27+
errMutuallyExclusiveAuth = errors.New(`"client_secret" and "client_certificate_path" are mutually exclusive`)
2728
)
2829

2930
type Config struct {
@@ -89,6 +90,8 @@ func (cfg *ServicePrincipal) Validate() error {
8990
}
9091
if cfg.ClientCertificatePath == "" && cfg.ClientSecret == "" {
9192
errs = append(errs, errEmptyClientCredential)
93+
} else if cfg.ClientCertificatePath != "" && cfg.ClientSecret != "" {
94+
errs = append(errs, errMutuallyExclusiveAuth)
9295
}
9396

9497
if len(errs) > 0 {

extension/azureauthextension/config_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,10 @@ func TestLoadConfig(t *testing.T) {
6161
},
6262
},
6363
},
64+
{
65+
id: component.NewIDWithName(metadata.Type, "service_principal_mutually_exclusive"),
66+
expectedErr: fmt.Sprintf("%s: %s", "service_principal", errMutuallyExclusiveAuth.Error()),
67+
},
6468
{
6569
id: component.NewIDWithName(metadata.Type, "service_principal_empty_client_id"),
6670
expectedErr: fmt.Sprintf("%s: %s", "service_principal", errEmptyClientID.Error()),

extension/azureauthextension/extension.go

Lines changed: 229 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -5,59 +5,270 @@ package azureauthextension // import "github.com/open-telemetry/opentelemetry-co
55

66
import (
77
"context"
8+
"crypto"
9+
"crypto/x509"
10+
"errors"
11+
"fmt"
812
"net/http"
13+
"os"
14+
"strings"
15+
"sync/atomic"
16+
"time"
917

1018
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
19+
"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
20+
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
1121
"go.opentelemetry.io/collector/component"
1222
"go.opentelemetry.io/collector/extension"
1323
"go.opentelemetry.io/collector/extension/extensionauth"
1424
"go.uber.org/zap"
1525
)
1626

27+
const (
28+
scheme = "Bearer"
29+
authorizationHeader = "Authorization"
30+
)
31+
32+
var (
33+
errEmptyAuthorizationHeader = errors.New("empty authorization header")
34+
errMissingAuthorizationHeader = errors.New("missing authorization header")
35+
errUnexpectedAuthorizationFormat = errors.New(`unexpected authorization value format, expected to be of format "Bearer <token>"`)
36+
errUnexpectedToken = errors.New("received token does not match the expected one")
37+
errUnavailableToken = errors.New("azure authenticator has no access to token")
38+
)
39+
1740
type authenticator struct {
41+
scope string
1842
credential azcore.TokenCredential
19-
logger *zap.Logger
43+
44+
stopCh chan int
45+
token atomic.Value
46+
47+
logger *zap.Logger
2048
}
2149

2250
var (
2351
_ extension.Extension = (*authenticator)(nil)
2452
_ extensionauth.HTTPClient = (*authenticator)(nil)
2553
_ extensionauth.Server = (*authenticator)(nil)
54+
_ azcore.TokenCredential = (*authenticator)(nil)
2655
)
2756

28-
func newAzureAuthenticator(_ *Config, logger *zap.Logger) *authenticator {
57+
func newAzureAuthenticator(cfg *Config, logger *zap.Logger) (*authenticator, error) {
2958
var credential azcore.TokenCredential
30-
// TODO
31-
// if cfg.UseDefault {
32-
// }
33-
// if cfg.Workload != nil {
34-
// }
35-
// if cfg.Managed != nil {
36-
// }
37-
// if cfg.ServicePrincipal != nil {
38-
// }
59+
var err error
60+
failMsg := "failed to create authenticator using"
61+
62+
if cfg.UseDefault {
63+
if credential, err = azidentity.NewDefaultAzureCredential(nil); err != nil {
64+
return nil, fmt.Errorf("%s default identity: %w", failMsg, err)
65+
}
66+
}
67+
68+
if cfg.Workload != nil {
69+
if credential, err = azidentity.NewWorkloadIdentityCredential(&azidentity.WorkloadIdentityCredentialOptions{
70+
ClientID: cfg.Workload.ClientID,
71+
TenantID: cfg.Workload.TenantID,
72+
TokenFilePath: cfg.Workload.FederatedTokenFile,
73+
}); err != nil {
74+
return nil, fmt.Errorf("%s workload identity: %w", failMsg, err)
75+
}
76+
}
77+
78+
if cfg.Managed != nil {
79+
clientID := cfg.Managed.ClientID
80+
var options *azidentity.ManagedIdentityCredentialOptions
81+
if clientID != "" {
82+
options = &azidentity.ManagedIdentityCredentialOptions{
83+
ID: azidentity.ClientID(clientID),
84+
}
85+
}
86+
if credential, err = azidentity.NewManagedIdentityCredential(options); err != nil {
87+
return nil, fmt.Errorf("%s managed identity: %w", failMsg, err)
88+
}
89+
}
90+
91+
if cfg.ServicePrincipal != nil {
92+
if cfg.ServicePrincipal.ClientCertificatePath != "" {
93+
cert, privateKey, errParse := getCertificateAndKey(cfg.ServicePrincipal.ClientCertificatePath)
94+
if errParse != nil {
95+
return nil, fmt.Errorf("%s service principal with certificate: %w", failMsg, errParse)
96+
}
97+
98+
if credential, err = azidentity.NewClientCertificateCredential(
99+
cfg.ServicePrincipal.TenantID,
100+
cfg.ServicePrincipal.ClientID,
101+
[]*x509.Certificate{cert},
102+
privateKey,
103+
nil,
104+
); err != nil {
105+
return nil, fmt.Errorf("%s service principal with certificate: %w", failMsg, err)
106+
}
107+
}
108+
if cfg.ServicePrincipal.ClientSecret != "" {
109+
if credential, err = azidentity.NewClientSecretCredential(
110+
cfg.ServicePrincipal.TenantID,
111+
cfg.ServicePrincipal.ClientID,
112+
cfg.ServicePrincipal.ClientSecret,
113+
nil,
114+
); err != nil {
115+
return nil, fmt.Errorf("%s service principal with secret: %w", failMsg, err)
116+
}
117+
}
118+
}
119+
39120
return &authenticator{
121+
scope: cfg.Scope,
40122
credential: credential,
123+
stopCh: make(chan int, 1),
124+
token: atomic.Value{},
41125
logger: logger,
126+
}, nil
127+
}
128+
129+
// getCertificateAndKey from the file
130+
func getCertificateAndKey(filename string) (*x509.Certificate, crypto.PrivateKey, error) {
131+
data, err := os.ReadFile(filename)
132+
if err != nil {
133+
return nil, nil, fmt.Errorf("could not read the certificate file: %w", err)
134+
}
135+
136+
certs, privateKey, err := azidentity.ParseCertificates(data, nil)
137+
if err != nil {
138+
return nil, nil, fmt.Errorf("failed to parse certificates: %w", err)
42139
}
140+
141+
return certs[0], privateKey, nil
43142
}
44143

45-
func (a authenticator) Start(_ context.Context, _ component.Host) error {
46-
// TODO
144+
func (a *authenticator) Start(ctx context.Context, _ component.Host) error {
145+
go a.trackToken(ctx)
47146
return nil
48147
}
49148

50-
func (a authenticator) Shutdown(_ context.Context) error {
51-
// TODO
149+
// updateToken makes a request to get a new token
150+
// if the authenticator does not have a token or
151+
// it has expired.
152+
func (a *authenticator) updateToken(ctx context.Context) (time.Time, error) {
153+
if a.credential == nil {
154+
return time.Time{}, errors.New("authenticator does not have credentials configured")
155+
}
156+
token, err := a.credential.GetToken(ctx, policy.TokenRequestOptions{
157+
Scopes: []string{a.scope},
158+
})
159+
if err != nil {
160+
// TODO Handle retries
161+
return time.Time{}, fmt.Errorf("azure authenticator failed to get token: %w", err)
162+
}
163+
a.token.Store(token)
164+
return token.ExpiresOn, nil
165+
}
166+
167+
// trackToken runs on the background to refresh
168+
// the token if it expires
169+
func (a *authenticator) trackToken(ctx context.Context) {
170+
expiresOn, err := a.updateToken(ctx)
171+
if err != nil {
172+
// TODO Handle retries
173+
a.logger.Error("failed to update the token", zap.Error(err))
174+
return
175+
}
176+
177+
getRefresh := func(expiresOn time.Time) time.Duration {
178+
// Refresh at 95% token lifetime
179+
return time.Until(expiresOn) * 95 / 100
180+
}
181+
182+
t := time.NewTicker(getRefresh(expiresOn))
183+
defer t.Stop()
184+
185+
for {
186+
select {
187+
case <-ctx.Done():
188+
a.logger.Info(
189+
"azure authenticator no longer refreshing the token",
190+
zap.String("reason", "context done"),
191+
)
192+
return
193+
case <-a.stopCh:
194+
a.logger.Info(
195+
"azure authenticator no longer refreshing the token",
196+
zap.String("reason", "received stop signal"),
197+
)
198+
close(a.stopCh)
199+
return
200+
case <-t.C:
201+
expiresOn, err = a.updateToken(ctx)
202+
if err != nil {
203+
// TODO Handle retries
204+
a.logger.Error("failed to update the token", zap.Error(err))
205+
a.stopCh <- 1
206+
} else {
207+
t.Reset(getRefresh(expiresOn))
208+
}
209+
}
210+
}
211+
}
212+
213+
func (a *authenticator) Shutdown(_ context.Context) error {
214+
select {
215+
case a.stopCh <- 1:
216+
default: // already stopped
217+
}
52218
return nil
53219
}
54220

55-
func (a authenticator) Authenticate(ctx context.Context, _ map[string][]string) (context.Context, error) {
56-
// TODO
221+
func (a *authenticator) getCurrentToken() (azcore.AccessToken, error) {
222+
token := a.token.Load()
223+
if token == nil {
224+
return azcore.AccessToken{}, errUnavailableToken
225+
}
226+
227+
return token.(azcore.AccessToken), nil
228+
}
229+
230+
// GetToken returns an access token with a
231+
// valid token for authorization
232+
func (a *authenticator) GetToken(_ context.Context, _ policy.TokenRequestOptions) (azcore.AccessToken, error) {
233+
return a.getCurrentToken()
234+
}
235+
236+
// Authenticate adds an Authorization header
237+
// with the bearer token
238+
func (a *authenticator) Authenticate(ctx context.Context, headers map[string][]string) (context.Context, error) {
239+
auth, ok := headers[authorizationHeader]
240+
if !ok {
241+
auth, ok = headers[strings.ToLower(authorizationHeader)]
242+
}
243+
if !ok {
244+
return ctx, errMissingAuthorizationHeader
245+
}
246+
if len(auth) == 0 {
247+
return ctx, errEmptyAuthorizationHeader
248+
}
249+
250+
firstAuth := strings.Split(auth[0], " ")
251+
if len(firstAuth) != 2 {
252+
return ctx, errUnexpectedAuthorizationFormat
253+
}
254+
if firstAuth[0] != scheme {
255+
return ctx, fmt.Errorf("expected %q scheme, got %q", scheme, firstAuth[0])
256+
}
257+
258+
currentToken, err := a.getCurrentToken()
259+
if err != nil {
260+
return ctx, err
261+
}
262+
263+
if firstAuth[1] != currentToken.Token {
264+
return ctx, errUnexpectedToken
265+
}
266+
57267
return ctx, nil
58268
}
59269

60-
func (a authenticator) RoundTripper(_ http.RoundTripper) (http.RoundTripper, error) {
270+
func (a *authenticator) RoundTripper(_ http.RoundTripper) (http.RoundTripper, error) {
61271
// TODO
272+
// See request header: https://learn.microsoft.com/en-us/rest/api/azure/#request-header
62273
return nil, nil
63274
}

0 commit comments

Comments
 (0)