Skip to content

Commit 5cdc6ec

Browse files
anchitjemasab
andauthored
Improve HTTPS CA certificates configuration (#5107)
Use `rd_kafka_ssl_probe_and_set_default_ca_location` and set same CA search policy as Kafka SSL. - on Windows try to load from certificate store or fallback to paths in case using msys2. - on macOS use `probe` by default. - on Linux use `probe` when configured, check default path for dynamic linked OpenSSL or fallback to `probe` when statically linked. --------- Co-authored-by: Emanuele Sabellico <[email protected]>
1 parent 606d0e4 commit 5cdc6ec

13 files changed

+452
-65
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@ librdkafka v2.11.0 is a feature release:
99
initialized (#4718).
1010
* Features BROKER_BALANCED_CONSUMER and SASL_GSSAPI don't depend on
1111
JoinGroup v0 anymore, missing in AK 4.0 and CP 8.0 (#5131).
12+
* Improve HTTPS CA certificates configuration by probing several paths
13+
when OpenSSL is statically linked and providing a way to customize their location
14+
or value (#).
1215

1316

1417
## Fixes

CONFIGURATION.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@ ssl.certificate.location | * | |
7373
ssl.certificate.pem | * | | | low | Client's public key string (PEM format) used for authentication. <br>*Type: string*
7474
ssl_certificate | * | | | low | Client's public key as set by rd_kafka_conf_set_ssl_cert() <br>*Type: see dedicated API*
7575
ssl.ca.location | * | | | low | File or directory path to CA certificate(s) for verifying the broker's key. Defaults: On Windows the system's CA certificates are automatically looked up in the Windows Root certificate store. On Mac OSX this configuration defaults to `probe`. It is recommended to install openssl using Homebrew, to provide CA certificates. On Linux install the distribution's ca-certificates package. If OpenSSL is statically linked or `ssl.ca.location` is set to `probe` a list of standard paths will be probed and the first one found will be used as the default CA certificate location path. If OpenSSL is dynamically linked the OpenSSL library's default path will be used (see `OPENSSLDIR` in `openssl version -a`). <br>*Type: string*
76+
https.ca.location | * | | | low | File or directory path to CA certificate(s) for verifying HTTPS endpoints, like `sasl.oauthbearer.token.endpoint.url` used for OAUTHBEARER/OIDC authentication. Mutually exclusive with `https.ca.pem`. Defaults: On Windows the system's CA certificates are automatically looked up in the Windows Root certificate store. On Mac OSX this configuration defaults to `probe`. It is recommended to install openssl using Homebrew, to provide CA certificates. On Linux install the distribution's ca-certificates package. If OpenSSL is statically linked or `https.ca.location` is set to `probe` a list of standard paths will be probed and the first one found will be used as the default CA certificate location path. If OpenSSL is dynamically linked the OpenSSL library's default path will be used (see `OPENSSLDIR` in `openssl version -a`). <br>*Type: string*
77+
https.ca.pem | * | | | low | CA certificate string (PEM format) for verifying HTTPS endpoints. Mutually exclusive with `https.ca.location`. Optional: see `https.ca.location`. <br>*Type: string*
7678
ssl.ca.pem | * | | | low | CA certificate string (PEM format) for verifying the broker's key. <br>*Type: string*
7779
ssl_ca | * | | | low | CA certificate as set by rd_kafka_conf_set_ssl_cert() <br>*Type: see dedicated API*
7880
ssl.ca.certificate.stores | * | | Root | low | Comma-separated list of Windows Certificate stores to load CA certificates from. Certificates will be loaded in the same order as stores are specified. If no certificates can be loaded from any of the specified stores an error is logged and the OpenSSL library's default CA location is used instead. Store names are typically one or more of: MY, Root, Trust, CA. <br>*Type: string*

INTRODUCTION.md

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1197,7 +1197,40 @@ To use this authentication method the client needs to be configured as follows:
11971197
provided to the broker. A comma-separated list of key=value pairs.
11981198
For example:
11991199
`supportFeatureX=true,organizationId=sales-emea`
1200-
1200+
* `https.ca.location` - (optional) to customize the CA certificates
1201+
location.
1202+
1203+
* `https.ca.pem` - (optional) to provide the CA certificates as a PEM string.
1204+
1205+
##### JWT bearer grant type (KIP-1139)
1206+
1207+
This KIP adds support for the `client_credentials, urn:ietf:params:oauth:grant-type:jwt-bearer`
1208+
grant type, with a series of properties to be used for creating a JWT assertion
1209+
sent to the token endpoint. The authenticated principal corresponds to the
1210+
`sub` claim returned by token endpoint, `sasl.oauthbearer.client.id` and
1211+
`sasl.oauthbearer.client.secret` aren't used. Required JWT claims must be set
1212+
either through the template or with the `claim` properties.
1213+
1214+
* `sasl.oauthbearer.grant.type` - changes the default grant type, set it to
1215+
`urn:ietf:params:oauth:grant-type:jwt-bearer`.
1216+
* `sasl.oauthbearer.assertion.algorithm` - JWT algorithm defaults to `RS256`.
1217+
* `sasl.oauthbearer.assertion.private.key.file` - a private key file for signing
1218+
the token.
1219+
* `sasl.oauthbearer.assertion.private.key.passphrase` - (optional) passphrase for the key if encrypted.
1220+
* `sasl.oauthbearer.assertion.private.key.pem` - alternatively to the key file
1221+
it's possible to pass the private key as a string.
1222+
* `sasl.oauthbearer.assertion.file` - (optional) assertion file: with this property all other
1223+
assertion related fields are ignored and the assertion is read from this file
1224+
that should be periodically updated.
1225+
* `sasl.oauthbearer.assertion.jwt.template.file` - (optional) template file: a template containing
1226+
a default `header` and `payload` that can be overwritten by the `claim` properties.
1227+
* `sasl.oauthbearer.assertion.claim.aud`,
1228+
`sasl.oauthbearer.assertion.claim.exp.seconds`,
1229+
`sasl.oauthbearer.assertion.claim.iss`,
1230+
`sasl.oauthbearer.assertion.claim.jti.include`,
1231+
`sasl.oauthbearer.assertion.claim.sub` - (optional) the `claim` properties:
1232+
it's possible to dynamically customize the JWT claims with these or to
1233+
skip the template file and use only these properties.
12011234

12021235
<a name="sparse-connections"></a>
12031236
#### Sparse connections

src/rd.h

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
#include <time.h>
5454
#include <assert.h>
5555
#include <limits.h>
56+
#include <sys/stat.h>
5657

5758
#include "tinycthread.h"
5859
#include "rdsysqueue.h"
@@ -544,4 +545,34 @@ rd_file_mkstemp(const char *prefix,
544545
return tempfile;
545546
}
546547

548+
/**
549+
* @brief Retrive stat for a \p path .
550+
*
551+
* @param path Path to the file or directory.
552+
* @param is_dir Pointer to store if the \p path is a directory (optional).
553+
*
554+
* @return `rd_true` if the path exists.
555+
*/
556+
static RD_INLINE RD_UNUSED rd_bool_t rd_file_stat(const char *path,
557+
rd_bool_t *is_dir) {
558+
#ifdef _WIN32
559+
struct _stat st;
560+
if (_stat(path, &st) == 0) {
561+
if (is_dir)
562+
*is_dir = st.st_mode & S_IFDIR;
563+
return rd_true;
564+
}
565+
#else
566+
struct stat st;
567+
if (stat(path, &st) == 0) {
568+
if (is_dir)
569+
*is_dir = S_ISDIR(st.st_mode);
570+
return rd_true;
571+
}
572+
#endif
573+
if (is_dir)
574+
*is_dir = rd_false;
575+
return rd_false;
576+
}
577+
547578
#endif /* _RD_H_ */

src/rdhttp.c

Lines changed: 156 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,10 @@
4040
#include <curl/curl.h>
4141
#include "rdhttp.h"
4242

43+
#if WITH_SSL
44+
#include "rdkafka_ssl.h"
45+
#endif
46+
4347
/** Maximum response size, increase as necessary. */
4448
#define RD_HTTP_RESPONSE_SIZE_MAX 1024 * 1024 * 500 /* 500kb */
4549

@@ -128,8 +132,143 @@ rd_http_req_write_cb(char *ptr, size_t size, size_t nmemb, void *userdata) {
128132
return nmemb;
129133
}
130134

131-
rd_http_error_t *rd_http_req_init(rd_http_req_t *hreq, const char *url) {
135+
#if WITH_SSL
136+
/**
137+
* @brief Callback function for setting up the SSL_CTX for HTTPS requests.
138+
*
139+
* This function sets the default CA paths for the SSL_CTX, and if that fails,
140+
* it attempts to probe and set a default CA location. If `probe` is forced
141+
* it skips the default CA paths and directly probes for CA certificates.
142+
*
143+
* On Windows, it attempts to load CA root certificates from the
144+
* configured Windows certificate stores before falling back to the default.
145+
*
146+
* @return `CURLE_OK` on success, or `CURLE_SSL_CACERT_BADFILE` on failure.
147+
*/
148+
static CURLcode
149+
rd_http_ssl_ctx_function(CURL *curl, void *sslctx, void *userptr) {
150+
SSL_CTX *ctx = (SSL_CTX *)sslctx;
151+
rd_kafka_t *rk = (rd_kafka_t *)userptr;
152+
int r = -1;
153+
rd_bool_t force_probe =
154+
!rd_strcmp(rk->rk_conf.https.ca_location, "probe");
155+
rd_bool_t use_probe = force_probe;
156+
157+
#if WITH_STATIC_LIB_libcrypto
158+
/* We fallback to `probe` when statically linked. */
159+
use_probe = rd_true;
160+
#endif
161+
162+
#ifdef _WIN32
163+
/* Attempt to load CA root certificates from the
164+
* configured Windows certificate stores. */
165+
r = rd_kafka_ssl_win_load_cert_stores(rk, "https", ctx,
166+
rk->rk_conf.ssl.ca_cert_stores);
167+
if (r == 0) {
168+
rd_kafka_log(rk, LOG_NOTICE, "CERTSTORE",
169+
"No CA certificates loaded for `https` from "
170+
"Windows certificate stores: "
171+
"falling back to default OpenSSL CA paths");
172+
r = -1;
173+
} else if (r == -1)
174+
rd_kafka_log(rk, LOG_NOTICE, "CERTSTORE",
175+
"Failed to load CA certificates for `https` from "
176+
"Windows certificate stores: "
177+
"falling back to default OpenSSL CA paths");
178+
179+
if (r != -1) {
180+
rd_kafka_dbg(rk, SECURITY, "SSL",
181+
"Successfully loaded CA certificates for `https` "
182+
"from Windows certificate stores");
183+
return CURLE_OK; /* Success, CA certs loaded on Windows */
184+
}
185+
#endif
132186

187+
if (!force_probe) {
188+
/* Previous default behavior: use predefined paths set when
189+
* building OpenSSL. */
190+
char errstr[512];
191+
r = SSL_CTX_set_default_verify_paths(ctx);
192+
if (r == 1) {
193+
rd_kafka_dbg(rk, SECURITY, "SSL",
194+
"SSL_CTX_set_default_verify_paths() "
195+
"for `https` "
196+
"succeeded");
197+
return CURLE_OK; /* Success */
198+
}
199+
200+
/* Read error and clear the error stack. */
201+
rd_kafka_ssl_error0(rk, NULL, "https", errstr, sizeof(errstr));
202+
rd_kafka_dbg(rk, SECURITY, "SSL",
203+
"SSL_CTX_set_default_verify_paths() "
204+
"for `https` "
205+
"failed: %s",
206+
errstr);
207+
}
208+
209+
if (use_probe) {
210+
/* We asked for probing or we're using
211+
* a statically linked version of OpenSSL. */
212+
213+
r = rd_kafka_ssl_probe_and_set_default_ca_location(rk, "https",
214+
ctx);
215+
if (r == 0)
216+
return CURLE_OK;
217+
}
218+
219+
return CURLE_SSL_CACERT_BADFILE;
220+
}
221+
222+
static void rd_http_ssl_configure(rd_kafka_t *rk, CURL *hreq_curl) {
223+
rd_bool_t force_probe =
224+
!rd_strcmp(rk->rk_conf.https.ca_location, "probe");
225+
226+
if (!force_probe && rk->rk_conf.https.ca_location) {
227+
rd_bool_t is_dir;
228+
rd_kafka_dbg(rk, SECURITY, "SSL",
229+
"Setting `https` CA certs from "
230+
"configured location: %s",
231+
rk->rk_conf.https.ca_location);
232+
if (rd_file_stat(rk->rk_conf.https.ca_location, &is_dir)) {
233+
if (is_dir) {
234+
curl_easy_setopt(hreq_curl, CURLOPT_CAPATH,
235+
rk->rk_conf.https.ca_location);
236+
curl_easy_setopt(hreq_curl, CURLOPT_CAINFO,
237+
NULL);
238+
} else {
239+
curl_easy_setopt(hreq_curl, CURLOPT_CAPATH,
240+
NULL);
241+
curl_easy_setopt(hreq_curl, CURLOPT_CAINFO,
242+
rk->rk_conf.https.ca_location);
243+
}
244+
} else {
245+
/* Path doesn't exist, don't set any trusted
246+
* certificate. */
247+
curl_easy_setopt(hreq_curl, CURLOPT_CAINFO, NULL);
248+
curl_easy_setopt(hreq_curl, CURLOPT_CAPATH, NULL);
249+
}
250+
} else if (!force_probe && rk->rk_conf.https.ca_pem) {
251+
struct curl_blob ca_blob = {
252+
.data = rk->rk_conf.https.ca_pem,
253+
.len = strlen(rk->rk_conf.https.ca_pem),
254+
.flags = CURL_BLOB_COPY};
255+
rd_kafka_dbg(rk, SECURITY, "SSL",
256+
"Setting `https` CA certs from "
257+
"configured PEM string");
258+
curl_easy_setopt(hreq_curl, CURLOPT_CAINFO_BLOB, &ca_blob);
259+
/* Only the blob should be set, no default paths. */
260+
curl_easy_setopt(hreq_curl, CURLOPT_CAINFO, NULL);
261+
curl_easy_setopt(hreq_curl, CURLOPT_CAPATH, NULL);
262+
} else {
263+
curl_easy_setopt(hreq_curl, CURLOPT_SSL_CTX_FUNCTION,
264+
rd_http_ssl_ctx_function);
265+
curl_easy_setopt(hreq_curl, CURLOPT_SSL_CTX_DATA, rk);
266+
}
267+
}
268+
#endif
269+
270+
rd_http_error_t *
271+
rd_http_req_init(rd_kafka_t *rk, rd_http_req_t *hreq, const char *url) {
133272
memset(hreq, 0, sizeof(*hreq));
134273

135274
hreq->hreq_curl = curl_easy_init();
@@ -157,6 +296,10 @@ rd_http_error_t *rd_http_req_init(rd_http_req_t *hreq, const char *url) {
157296
rd_http_req_write_cb);
158297
curl_easy_setopt(hreq->hreq_curl, CURLOPT_WRITEDATA, (void *)hreq);
159298

299+
#if WITH_SSL
300+
rd_http_ssl_configure(rk, hreq->hreq_curl);
301+
#endif
302+
160303
return NULL;
161304
}
162305

@@ -207,13 +350,14 @@ const char *rd_http_req_get_content_type(rd_http_req_t *hreq) {
207350
* by calling rd_http_error_destroy(). In case of HTTP error the \p *rbufp
208351
* may be filled with the error response.
209352
*/
210-
rd_http_error_t *rd_http_get(const char *url, rd_buf_t **rbufp) {
353+
rd_http_error_t *
354+
rd_http_get(rd_kafka_t *rk, const char *url, rd_buf_t **rbufp) {
211355
rd_http_req_t hreq;
212356
rd_http_error_t *herr;
213357

214358
*rbufp = NULL;
215359

216-
herr = rd_http_req_init(&hreq, url);
360+
herr = rd_http_req_init(rk, &hreq, url);
217361
if (unlikely(herr != NULL))
218362
return herr;
219363

@@ -317,7 +461,7 @@ rd_http_error_t *rd_http_post_expect_json(rd_kafka_t *rk,
317461
size_t len;
318462
const char *content_type;
319463

320-
herr = rd_http_req_init(&hreq, url);
464+
herr = rd_http_req_init(rk, &hreq, url);
321465
if (unlikely(herr != NULL))
322466
return herr;
323467

@@ -382,7 +526,8 @@ rd_http_error_t *rd_http_post_expect_json(rd_kafka_t *rk,
382526
*
383527
* Same error semantics as rd_http_get().
384528
*/
385-
rd_http_error_t *rd_http_get_json(const char *url, cJSON **jsonp) {
529+
rd_http_error_t *
530+
rd_http_get_json(rd_kafka_t *rk, const char *url, cJSON **jsonp) {
386531
rd_http_req_t hreq;
387532
rd_http_error_t *herr;
388533
rd_slice_t slice;
@@ -393,7 +538,7 @@ rd_http_error_t *rd_http_get_json(const char *url, cJSON **jsonp) {
393538

394539
*jsonp = NULL;
395540

396-
herr = rd_http_req_init(&hreq, url);
541+
herr = rd_http_req_init(rk, &hreq, url);
397542
if (unlikely(herr != NULL))
398543
return herr;
399544

@@ -468,19 +613,21 @@ int unittest_http(void) {
468613
cJSON *json, *jval;
469614
rd_http_error_t *herr;
470615
rd_bool_t empty;
616+
rd_kafka_t *rk;
471617

472618
if (!base_url || !*base_url)
473619
RD_UT_SKIP("RD_UT_HTTP_URL environment variable not set");
474620

475621
RD_UT_BEGIN();
476622

623+
rk = rd_calloc(1, sizeof(*rk));
477624
error_url_size = strlen(base_url) + strlen("/error") + 1;
478625
error_url = rd_alloca(error_url_size);
479626
rd_snprintf(error_url, error_url_size, "%s/error", base_url);
480627

481628
/* Try the base url first, parse its JSON and extract a key-value. */
482629
json = NULL;
483-
herr = rd_http_get_json(base_url, &json);
630+
herr = rd_http_get_json(rk, base_url, &json);
484631
RD_UT_ASSERT(!herr, "Expected get_json(%s) to succeed, got: %s",
485632
base_url, herr->errstr);
486633

@@ -500,7 +647,7 @@ int unittest_http(void) {
500647

501648
/* Try the error URL, verify error code. */
502649
json = NULL;
503-
herr = rd_http_get_json(error_url, &json);
650+
herr = rd_http_get_json(rk, error_url, &json);
504651
RD_UT_ASSERT(herr != NULL, "Expected get_json(%s) to fail", error_url);
505652
RD_UT_ASSERT(herr->code >= 400,
506653
"Expected get_json(%s) error code >= "
@@ -514,6 +661,7 @@ int unittest_http(void) {
514661
if (json)
515662
cJSON_Delete(json);
516663
rd_http_error_destroy(herr);
664+
rd_free(rk);
517665

518666
RD_UT_PASS();
519667
}

src/rdhttp.h

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,9 @@ typedef struct rd_http_error_s {
4242

4343
void rd_http_error_destroy(rd_http_error_t *herr);
4444

45-
rd_http_error_t *rd_http_get(const char *url, rd_buf_t **rbufp);
46-
rd_http_error_t *rd_http_get_json(const char *url, cJSON **jsonp);
45+
rd_http_error_t *rd_http_get(rd_kafka_t *rk, const char *url, rd_buf_t **rbufp);
46+
rd_http_error_t *
47+
rd_http_get_json(rd_kafka_t *rk, const char *url, cJSON **jsonp);
4748

4849
void rd_http_global_init(void);
4950

@@ -62,7 +63,8 @@ typedef struct rd_http_req_s {
6263
* write to. */
6364
} rd_http_req_t;
6465

65-
rd_http_error_t *rd_http_req_init(rd_http_req_t *hreq, const char *url);
66+
rd_http_error_t *
67+
rd_http_req_init(rd_kafka_t *rk, rd_http_req_t *hreq, const char *url);
6668
rd_http_error_t *rd_http_req_perform_sync(rd_http_req_t *hreq);
6769
rd_http_error_t *rd_http_parse_json(rd_http_req_t *hreq, cJSON **jsonp);
6870
rd_http_error_t *rd_http_post_expect_json(rd_kafka_t *rk,

src/rdkafka.c

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5213,13 +5213,8 @@ const char *rd_kafka_get_debug_contexts(void) {
52135213

52145214

52155215
int rd_kafka_path_is_dir(const char *path) {
5216-
#ifdef _WIN32
5217-
struct _stat st;
5218-
return (_stat(path, &st) == 0 && st.st_mode & S_IFDIR);
5219-
#else
5220-
struct stat st;
5221-
return (stat(path, &st) == 0 && S_ISDIR(st.st_mode));
5222-
#endif
5216+
rd_bool_t is_dir;
5217+
return rd_file_stat(path, &is_dir) && is_dir;
52235218
}
52245219

52255220

0 commit comments

Comments
 (0)