Skip to content

Commit 8bf0a67

Browse files
authored
KIP 848: Added support for DescribeConsumerGroup for consumer protocol groups (#4941)
1 parent 13b3c06 commit 8bf0a67

File tree

10 files changed

+541
-91
lines changed

10 files changed

+541
-91
lines changed

CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ librdkafka v2.10.0 is a feature release:
3939
leader change and offset validation (#4970).
4040
* Fix the Nagle algorithm (TCP_NODELAY) on broker sockets to not be enabled
4141
by default (#4986).
42+
* [KIP-848] `rd_kafka_DescribeConsumerGroups()` now supports KIP-848 introduced `consumer` groups. Two new fields for consumer group type and target assignment have also been added. Type defines whether this group is a `classic` or `consumer` group. Target assignment is only valid for the `consumer` protocol and its defaults to NULL. (#4922).
43+
* [KIP-848] Group Config is now supported in AlterConfigs, IncrementalAlterConfigs and DescribeConfigs. (#4939).
4244

4345

4446
## Fixes
@@ -160,7 +162,6 @@ librdkafka v2.10.0 is a feature release:
160162

161163
librdkafka v2.8.0 is a maintenance release:
162164

163-
* Extend Config Apis to support group config (#4939).
164165
* Socket options are now all set before connection (#4893).
165166
* Client certificate chain is now sent when using `ssl.certificate.pem`
166167
or `ssl_certificate` or `ssl.keystore.location` (#4894).

examples/describe_consumer_groups.c

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -164,15 +164,31 @@ print_group_member_info(const rd_kafka_MemberDescription_t *member) {
164164
rd_kafka_MemberDescription_host(member));
165165
const rd_kafka_MemberAssignment_t *assignment =
166166
rd_kafka_MemberDescription_assignment(member);
167-
const rd_kafka_topic_partition_list_t *topic_partitions =
167+
const rd_kafka_topic_partition_list_t *assigned_topic_partitions =
168168
rd_kafka_MemberAssignment_partitions(assignment);
169-
if (!topic_partitions) {
169+
const rd_kafka_MemberAssignment_t *target_assignment =
170+
rd_kafka_MemberDescription_target_assignment(member);
171+
const rd_kafka_topic_partition_list_t *target_topic_partitions =
172+
target_assignment
173+
? rd_kafka_MemberAssignment_partitions(target_assignment)
174+
: NULL;
175+
if (!assigned_topic_partitions) {
170176
printf(" No assignment\n");
171-
} else if (topic_partitions->cnt == 0) {
177+
} else if (assigned_topic_partitions->cnt == 0) {
172178
printf(" Empty assignment\n");
173179
} else {
174180
printf(" Assignment:\n");
175-
print_partition_list(stdout, topic_partitions, 0, " ");
181+
print_partition_list(stdout, assigned_topic_partitions, 0,
182+
" ");
183+
}
184+
if (!target_topic_partitions) {
185+
printf(" No target assignment\n");
186+
} else if (target_topic_partitions->cnt == 0) {
187+
printf(" Empty target assignment\n");
188+
} else {
189+
printf(" Target assignment:\n");
190+
print_partition_list(stdout, target_topic_partitions, 0,
191+
" ");
176192
}
177193
}
178194

@@ -194,6 +210,8 @@ static void print_group_info(const rd_kafka_ConsumerGroupDescription_t *group) {
194210
rd_kafka_ConsumerGroupDescription_partition_assignor(group);
195211
rd_kafka_consumer_group_state_t state =
196212
rd_kafka_ConsumerGroupDescription_state(group);
213+
rd_kafka_consumer_group_type_t type =
214+
rd_kafka_ConsumerGroupDescription_type(group);
197215
authorized_operations =
198216
rd_kafka_ConsumerGroupDescription_authorized_operations(
199217
group, &authorized_operations_cnt);
@@ -212,9 +230,10 @@ static void print_group_info(const rd_kafka_ConsumerGroupDescription_t *group) {
212230
rd_kafka_Node_port(coordinator));
213231
}
214232
printf(
215-
"Group \"%s\", partition assignor \"%s\", "
216-
" state %s%s, with %" PRId32 " member(s)\n",
233+
"Group \"%s\", partition assignor \"%s\", type \"%s\""
234+
" state \"%s\"%s, with %" PRId32 " member(s)\n",
217235
group_id, partition_assignor,
236+
rd_kafka_consumer_group_type_name(type),
218237
rd_kafka_consumer_group_state_name(state), coordinator_desc,
219238
member_cnt);
220239
for (j = 0; j < authorized_operations_cnt; j++) {

src/rdkafka.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1690,6 +1690,7 @@ static void rd_kafka_stats_emit_broker_reqs(struct _stats_emit *st,
16901690
[RD_KAFKAP_AlterClientQuotas] = rd_true,
16911691
[RD_KAFKAP_DescribeUserScramCredentials] = rd_true,
16921692
[RD_KAFKAP_AlterUserScramCredentials] = rd_true,
1693+
[RD_KAFKAP_ConsumerGroupDescribe] = rd_true,
16931694
}};
16941695
int i;
16951696
int cnt = 0;

src/rdkafka.h

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8900,6 +8900,17 @@ RD_EXPORT
89008900
const rd_kafka_Node_t *rd_kafka_ConsumerGroupDescription_coordinator(
89018901
const rd_kafka_ConsumerGroupDescription_t *grpdesc);
89028902

8903+
/**
8904+
* @brief Gets type for the \p grpdesc group.
8905+
*
8906+
* @param grpdesc The group description.
8907+
*
8908+
* @return A group type.
8909+
*/
8910+
RD_EXPORT
8911+
rd_kafka_consumer_group_type_t rd_kafka_ConsumerGroupDescription_type(
8912+
const rd_kafka_ConsumerGroupDescription_t *grpdesc);
8913+
89038914
/**
89048915
* @brief Gets the members count of \p grpdesc group.
89058916
*
@@ -9012,6 +9023,21 @@ RD_EXPORT
90129023
const rd_kafka_topic_partition_list_t *rd_kafka_MemberAssignment_partitions(
90139024
const rd_kafka_MemberAssignment_t *assignment);
90149025

9026+
/**
9027+
* @brief Gets target assignment of \p member.
9028+
*
9029+
* @param member The group member.
9030+
*
9031+
* @return The target assignment for `consumer` group types.
9032+
* Returns NULL for the `classic` group types.
9033+
*
9034+
* @remark The lifetime of the returned memory is the same
9035+
* as the lifetime of the \p member object.
9036+
*/
9037+
RD_EXPORT
9038+
const rd_kafka_MemberAssignment_t *rd_kafka_MemberDescription_target_assignment(
9039+
const rd_kafka_MemberDescription_t *member);
9040+
90159041
/**@}*/
90169042

90179043
/**

0 commit comments

Comments
 (0)