Skip to content

Commit 606d0e4

Browse files
anchitjemasab
andauthored
[KIP-1139] Add support for OAuth jwt-bearer grant type (#4978)
support for `jwt-bearer` grant type as mandated by the KIP. --------- Co-authored-by: Emanuele Sabellico <[email protected]>
1 parent a222266 commit 606d0e4

24 files changed

+1821
-165
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
librdkafka v2.11.0 is a feature release:
44

55
* [KIP-1102](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1102%3A+Enable+clients+to+rebootstrap+based+on+timeout+or+error+code) Enable clients to rebootstrap based on timeout or error code (#4981).
6+
* [KIP-1139](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1139%3A+Add+support+for+OAuth+jwt-bearer+grant+type) Add support for OAuth jwt-bearer grant type (#4978).
67
* Fix for poll ratio calculation in case the queues are forwarded (#5017).
78
* Fix data race when buffer queues are being reset instead of being
89
initialized (#4718).

CONFIGURATION.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,10 +100,25 @@ enable.sasl.oauthbearer.unsecure.jwt | * | true, false | false
100100
oauthbearer_token_refresh_cb | * | | | low | SASL/OAUTHBEARER token refresh callback (set with rd_kafka_conf_set_oauthbearer_token_refresh_cb(), triggered by rd_kafka_poll(), et.al. This callback will be triggered when it is time to refresh the client's OAUTHBEARER token. Also see `rd_kafka_conf_enable_sasl_queue()`. <br>*Type: see dedicated API*
101101
sasl.oauthbearer.method | * | default, oidc | default | low | Set to "default" or "oidc" to control which login method to be used. If set to "oidc", the following properties must also be be specified: `sasl.oauthbearer.client.id`, `sasl.oauthbearer.client.secret`, and `sasl.oauthbearer.token.endpoint.url`. <br>*Type: enum value*
102102
sasl.oauthbearer.client.id | * | | | low | Public identifier for the application. Must be unique across all clients that the authorization server handles. Only used when `sasl.oauthbearer.method` is set to "oidc". <br>*Type: string*
103+
sasl.oauthbearer.client.credentials.client.id | * | | | low | Alias for `sasl.oauthbearer.client.id`: Public identifier for the application. Must be unique across all clients that the authorization server handles. Only used when `sasl.oauthbearer.method` is set to "oidc". <br>*Type: string*
104+
sasl.oauthbearer.client.credentials.client.secret | * | | | low | Alias for `sasl.oauthbearer.client.secret`: Client secret only known to the application and the authorization server. This should be a sufficiently random string that is not guessable. Only used when `sasl.oauthbearer.method` is set to "oidc". <br>*Type: string*
103105
sasl.oauthbearer.client.secret | * | | | low | Client secret only known to the application and the authorization server. This should be a sufficiently random string that is not guessable. Only used when `sasl.oauthbearer.method` is set to "oidc". <br>*Type: string*
104106
sasl.oauthbearer.scope | * | | | low | Client use this to specify the scope of the access request to the broker. Only used when `sasl.oauthbearer.method` is set to "oidc". <br>*Type: string*
105107
sasl.oauthbearer.extensions | * | | | low | Allow additional information to be provided to the broker. Comma-separated list of key=value pairs. E.g., "supportFeatureX=true,organizationId=sales-emea".Only used when `sasl.oauthbearer.method` is set to "oidc". <br>*Type: string*
106108
sasl.oauthbearer.token.endpoint.url | * | | | low | OAuth/OIDC issuer token endpoint HTTP(S) URI used to retrieve token. Only used when `sasl.oauthbearer.method` is set to "oidc". <br>*Type: string*
109+
sasl.oauthbearer.grant.type | * | client_credentials, urn:ietf:params:oauth:grant-type:jwt-bearer | client_credentials | low | OAuth grant type to use when communicating with the identity provider. <br>*Type: enum value*
110+
sasl.oauthbearer.assertion.algorithm | * | RS256, ES256 | RS256 | low | Algorithm the client should use to sign the assertion sent to the identity provider and in the OAuth alg header in the JWT assertion. <br>*Type: enum value*
111+
sasl.oauthbearer.assertion.private.key.file | * | | | low | Path to client's private key (PEM) used for authentication when using the JWT assertion. <br>*Type: string*
112+
sasl.oauthbearer.assertion.private.key.passphrase | * | | | low | Private key passphrase for `sasl.oauthbearer.assertion.private.key.file` or `sasl.oauthbearer.assertion.private.key.pem`. <br>*Type: string*
113+
sasl.oauthbearer.assertion.private.key.pem | * | | | low | Client's private key (PEM) used for authentication when using the JWT assertion. <br>*Type: string*
114+
sasl.oauthbearer.assertion.file | * | | | low | Path to the assertion file. Only used when `sasl.oauthbearer.method` is set to "oidc" and JWT assertion is needed. <br>*Type: string*
115+
sasl.oauthbearer.assertion.claim.aud | * | | | low | JWT audience claim. Only used when `sasl.oauthbearer.method` is set to "oidc" and JWT assertion is needed. <br>*Type: string*
116+
sasl.oauthbearer.assertion.claim.exp.seconds | * | 1 .. 2147483647 | 300 | low | Assertion expiration time in seconds. Only used when `sasl.oauthbearer.method` is set to "oidc" and JWT assertion is needed. <br>*Type: integer*
117+
sasl.oauthbearer.assertion.claim.iss | * | | | low | JWT issuer claim. Only used when `sasl.oauthbearer.method` is set to "oidc" and JWT assertion is needed. <br>*Type: string*
118+
sasl.oauthbearer.assertion.claim.jti.include | * | true, false | false | low | JWT ID claim. When set to `true`a random UUID is generated. Only used when `sasl.oauthbearer.method` is set to "oidc" and JWT assertion is needed. <br>*Type: boolean*
119+
sasl.oauthbearer.assertion.claim.nbf.seconds | * | 0 .. 2147483647 | 60 | low | Assertion not before time in seconds. Only used when `sasl.oauthbearer.method` is set to "oidc" and JWT assertion is needed. <br>*Type: integer*
120+
sasl.oauthbearer.assertion.claim.sub | * | | | low | JWT subject claim. Only used when `sasl.oauthbearer.method` is set to "oidc" and JWT assertion is needed. <br>*Type: string*
121+
sasl.oauthbearer.assertion.jwt.template.file | * | | | low | Path to the JWT template file. Only used when `sasl.oauthbearer.method` is set to "oidc" and JWT assertion is needed. <br>*Type: string*
107122
plugin.library.paths | * | | | low | List of plugin libraries to load (; separated). The library search path is platform dependent (see dlopen(3) for Unix and LoadLibrary() for Windows). If no filename extension is specified the platform-specific extension (such as .dll or .so) will be appended automatically. <br>*Type: string*
108123
interceptors | * | | | low | Interceptors added through rd_kafka_conf_interceptor_add_..() and any configuration handled by interceptors. <br>*Type: see dedicated API*
109124
group.id | C | | | high | Client group id string. All clients sharing the same group.id belong to the same group. <br>*Type: string*

INTRODUCTION.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2137,6 +2137,7 @@ The [Apache Kafka Implementation Proposals (KIPs)](https://cwiki.apache.org/conf
21372137
| KIP-951 - Leader discovery optimisations for the client | 3.7.0 | Supported |
21382138
| KIP-1082 - Require Client-Generated IDs over the ConsumerGroupHeartbeat | 4.0.0 | Supported |
21392139
| KIP-1102 - Enable clients to rebootstrap based on timeout or error code | 4.0.0 | Supported |
2140+
| KIP-1139 - Add support for OAuth jwt-bearer grant type | 4.1.0 (WIP) | Supported |
21402141

21412142

21422143

src/rd.h

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -438,4 +438,110 @@ typedef struct rd_chariov_s {
438438
size_t size;
439439
} rd_chariov_t;
440440

441+
/**
442+
* @brief Read the file at \p file_path in binary mode and return its contents.
443+
* The returned buffer is NULL-terminated,
444+
* the size parameter will contain the actual file size.
445+
*
446+
* @param file_path Path to the file to read.
447+
* @param size Pointer to store the file size (optional).
448+
* @param max_size Maximum file size to read (0 for no limit) (optional).
449+
*
450+
* @returns Newly allocated buffer containing the file contents.
451+
* NULL on error (file not found, too large, etc).
452+
*
453+
* @remark The returned pointer ownership is transferred to the caller.
454+
*
455+
* @locality Any thread
456+
*/
457+
static RD_INLINE RD_UNUSED char *
458+
rd_file_read(const char *file_path, size_t *size, size_t max_size) {
459+
FILE *file;
460+
char *buf = NULL;
461+
size_t file_size;
462+
size_t read_size;
463+
if (!size)
464+
size = &read_size;
465+
466+
#ifndef _WIN32
467+
file = fopen(file_path, "rb");
468+
#else
469+
file = NULL;
470+
errno = fopen_s(&file, file_path, "rb");
471+
#endif
472+
if (!file)
473+
return NULL;
474+
475+
if (fseek(file, 0, SEEK_END) != 0)
476+
goto err;
477+
478+
file_size = (size_t)ftell(file);
479+
if (file_size < 0)
480+
goto err;
481+
482+
if (fseek(file, 0, SEEK_SET) != 0)
483+
goto err;
484+
485+
/* Check if file is too large */
486+
if (max_size > 0 && file_size > max_size)
487+
goto err;
488+
489+
/* Allocate buffer with extra byte for NULL terminator */
490+
buf = (char *)rd_malloc(file_size + 1);
491+
read_size = fread(buf, 1, file_size, file);
492+
493+
if (read_size != file_size)
494+
goto err;
495+
496+
/* NULL terminate the buffer */
497+
buf[file_size] = '\0';
498+
*size = file_size;
499+
fclose(file);
500+
return buf;
501+
err:
502+
fclose(file);
503+
if (buf)
504+
rd_free(buf);
505+
return NULL;
506+
}
507+
508+
static RD_INLINE RD_UNUSED FILE *
509+
rd_file_mkstemp(const char *prefix,
510+
const char *mode,
511+
char *tempfile_path_out,
512+
size_t tempfile_path_out_size) {
513+
FILE *tempfile;
514+
515+
#ifdef _WIN32
516+
char tempfolder_path[MAX_PATH];
517+
char tempfile_path[MAX_PATH];
518+
if (!GetTempPathA(MAX_PATH, tempfolder_path))
519+
return NULL; /* Failed to get temp folder path */
520+
521+
522+
if (!GetTempFileNameA(tempfolder_path, "TMP", 1, tempfile_path))
523+
return NULL; /* Failed to create temp file name */
524+
525+
tempfile = fopen(tempfile_path, mode);
526+
#else
527+
int tempfile_fd;
528+
char tempfile_path[512];
529+
rd_snprintf(tempfile_path, sizeof(tempfile_path), "/tmp/%sXXXXXX",
530+
prefix);
531+
tempfile_fd = mkstemp(tempfile_path);
532+
if (tempfile_fd < 0)
533+
return NULL;
534+
535+
tempfile = fdopen(tempfile_fd, mode);
536+
#endif
537+
538+
if (!tempfile)
539+
return NULL;
540+
541+
if (tempfile_path_out)
542+
rd_snprintf(tempfile_path_out, tempfile_path_out_size, "%s",
543+
tempfile_path);
544+
return tempfile;
545+
}
546+
441547
#endif /* _RD_H_ */

src/rdbase64.c

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,37 @@ char *rd_base64_encode_str(const rd_chariov_t *in) {
119119
return out.ptr;
120120
}
121121

122+
/**
123+
* @brief Base64 encode binary input \p in and return a newly allocated,
124+
* base64-encoded string with URL-safe characters.
125+
* @returns a newly allocated, base64-encoded string or NULL in case of some
126+
* issue with the conversion or the conversion is not supported.
127+
*
128+
* @remark Returned string must be freed after use.
129+
*/
130+
char *rd_base64_encode_str_urlsafe(const rd_chariov_t *in) {
131+
rd_chariov_t out;
132+
char *p;
133+
rd_base64_encode(in, &out);
134+
135+
/* Replace + with - and / with _ */
136+
for (p = out.ptr; *p; p++) {
137+
if (*p == '+')
138+
*p = '-';
139+
else if (*p == '/')
140+
*p = '_';
141+
}
142+
143+
/* Remove padding '=' characters */
144+
int newlen = strlen(out.ptr);
145+
while (newlen > 0 && out.ptr[newlen - 1] == '=') {
146+
out.ptr[newlen - 1] = '\0';
147+
newlen--;
148+
}
149+
150+
out.size = newlen;
151+
return out.ptr;
152+
}
122153

123154
/**
124155
* @brief Base64 decode input string \p in. Ignores leading and trailing
@@ -166,4 +197,4 @@ int rd_base64_decode(const rd_chariov_t *in, rd_chariov_t *out) {
166197
#else
167198
return -2;
168199
#endif
169-
}
200+
}

src/rdbase64.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ void rd_base64_encode(const rd_chariov_t *in, rd_chariov_t *out);
3636

3737
char *rd_base64_encode_str(const rd_chariov_t *in);
3838

39+
char *rd_base64_encode_str_urlsafe(const rd_chariov_t *in);
40+
3941
int rd_base64_decode(const rd_chariov_t *in, rd_chariov_t *out);
4042

4143
#endif /* _RDBASE64_H_ */

src/rdkafka.c

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2431,11 +2431,22 @@ rd_kafka_t *rd_kafka_new(rd_kafka_type_t type,
24312431
#if WITH_OAUTHBEARER_OIDC
24322432
if (rk->rk_conf.sasl.oauthbearer.method ==
24332433
RD_KAFKA_SASL_OAUTHBEARER_METHOD_OIDC &&
2434-
!rk->rk_conf.sasl.oauthbearer.token_refresh_cb)
2435-
rd_kafka_conf_set_oauthbearer_token_refresh_cb(
2436-
&rk->rk_conf, rd_kafka_oidc_token_refresh_cb);
2434+
!rk->rk_conf.sasl.oauthbearer.token_refresh_cb) {
2435+
/* Use JWT bearer */
2436+
if (rk->rk_conf.sasl.oauthbearer.grant_type ==
2437+
RD_KAFKA_SASL_OAUTHBEARER_GRANT_TYPE_CLIENT_CREDENTIALS) {
2438+
rd_kafka_conf_set_oauthbearer_token_refresh_cb(
2439+
&rk->rk_conf,
2440+
rd_kafka_oidc_token_client_credentials_refresh_cb);
2441+
} else {
2442+
rd_kafka_conf_set_oauthbearer_token_refresh_cb(
2443+
&rk->rk_conf,
2444+
rd_kafka_oidc_token_jwt_bearer_refresh_cb);
2445+
}
2446+
}
24372447
#endif
24382448

2449+
24392450
rk->rk_controllerid = -1;
24402451

24412452
/* Admin client defaults */
@@ -5360,7 +5371,7 @@ void rd_kafka_Uuid_destroy(rd_kafka_Uuid_t *uuid) {
53605371
*
53615372
* @remark Must be freed after use.
53625373
*/
5363-
const char *rd_kafka_Uuid_str(const rd_kafka_Uuid_t *uuid) {
5374+
char *rd_kafka_Uuid_str(const rd_kafka_Uuid_t *uuid) {
53645375
int i, j;
53655376
unsigned char bytes[16];
53665377
char *ret = rd_calloc(37, sizeof(*ret));

0 commit comments

Comments
 (0)