Skip to content

Commit 5ed295f

Browse files
Refactor KafkaProducer (#67)
* Refactor `KafkaProducer` Motiviation: * align `KafkaProducer` more with proposed changes to `KafkaConsumer` * `AsyncStream` was not handling `AsyncSequence` termination handling as we wanted it to, so revert back to use `NIOAsyncSequenceProducer` Modifications: * make `KafkaProducer` `final class` instead of `actor` * `KafkaProducer`: use `NIOAsyncSequenceProducer` instead of `AsyncSequence` for better termination handling -> shutdown `KafkaProducer` on termination of the `AsyncSequence` * introduce `StateMachine` to `KafkaProducer` * move internal state of `KafkaProducer` to `KafkaProducer.StateMachine` * remove unused `await` expressions when accessing `KafkaProducer` * update tests * update `README` * * rename KafkaProducer.StateMachine.State.shutDown to .finished * Remove unused awaits * KafkaProducer: move logger out of state * KafkaProducer: rename `killPollLoop` -> `terminatePollLoop` * Fix errors after rebase Modifications: * move `NoBackPressure` struct to `extension` of `NIOAsyncSequenceProducerBackPressureStrategies` * break down duplicate `ShutDownOnTerminate` type into two more specialised types for `KafkaConsumer` and `KafkaProducer` * add missing `config` parameter to `KafkaProducer`'s initialiser * Create wrapper for Kafka topic handle dict Modifications: * create new class `RDKafkaTopicHandles` that wraps a dictionary containing all topic names with their respective `rd_kafka_topic_t` handles * create method `KafkaClient.produce` wrapping the `rd_kafka_produce` method in a Swift way * Own implementation of `rd_kafka_flush()` Modifications: * `KafkaClient`: add new property `outgoingQueueSize` * `KafkaProducer.StateMachine`: add new state `.flushing` * `KafkaProducer.shutdownGracefully()`: * make non-async * remove invocation to `rd_kafka_flush` * set state to `KafkaProducer.StateMachine.State` to `.flushing` * `KafkaProducer` poll loop: * poll as long as `outgoingQueueSize` is > 0 to send out any enqueued `KafkaProducerMessage`s and serve any enqueued callbacks * `KafkaProducerTests`: add test asserting that the `librdkafka` `outq` is still being served after `KafkaProducer.shutdownGracefully` has been invoked as long as there are enqueued items * Review Franz Modifications: * rename `KafkaProducer.shutdownGracefully` to `KafkaProducer.triggerGracefulShutdown` * `KafkaProducer.send` separate error message when in state `.flushing`
1 parent 588af60 commit 5ed295f

File tree

8 files changed

+455
-176
lines changed

8 files changed

+455
-176
lines changed

README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ The `send(_:)` method of `KafkaProducer` returns a message-id that can later be
3030
```swift
3131
let config = KafkaProducerConfiguration(bootstrapServers: ["localhost:9092"])
3232

33-
let (producer, acknowledgements) = try await KafkaProducer.makeProducerWithAcknowledgements(
33+
let (producer, acknowledgements) = try KafkaProducer.makeProducerWithAcknowledgements(
3434
config: config,
3535
logger: .kafkaTest // Your logger here
3636
)
@@ -44,7 +44,7 @@ await withThrowingTaskGroup(of: Void.self) { group in
4444

4545
// Task receiving acknowledgements
4646
group.addTask {
47-
let messageID = try await producer.send(
47+
let messageID = try producer.send(
4848
KafkaProducerMessage(
4949
topic: "topic-name",
5050
value: "Hello, World!"
@@ -56,7 +56,7 @@ await withThrowingTaskGroup(of: Void.self) { group in
5656
}
5757

5858
// Required
59-
await producer.shutdownGracefully()
59+
producer.triggerGracefulShutdown()
6060
}
6161
}
6262
```

Sources/SwiftKafka/KafkaClient.swift

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,47 @@ final class KafkaClient {
4242
rd_kafka_destroy(kafkaHandle)
4343
}
4444

45+
/// Produce a message to the Kafka cluster.
46+
///
47+
/// - Parameter message: The ``KafkaProducerMessage`` that is sent to the KafkaCluster.
48+
/// - Parameter newMessageID: ID that was assigned to the `message`.
49+
/// - Parameter topicConfig: The ``KafkaTopicConfiguration`` used for newly created topics.
50+
/// - Parameter topicHandles: Topic handles that this client uses to produce new messages
51+
func produce(
52+
message: KafkaProducerMessage,
53+
newMessageID: UInt,
54+
topicConfig: KafkaTopicConfiguration,
55+
topicHandles: RDKafkaTopicHandles
56+
) throws {
57+
let keyBytes: [UInt8]?
58+
if var key = message.key {
59+
keyBytes = key.readBytes(length: key.readableBytes)
60+
} else {
61+
keyBytes = nil
62+
}
63+
64+
let responseCode = try message.value.withUnsafeReadableBytes { valueBuffer in
65+
return try topicHandles.withTopicHandlePointer(topic: message.topic, topicConfig: topicConfig) { topicHandle in
66+
// Pass message over to librdkafka where it will be queued and sent to the Kafka Cluster.
67+
// Returns 0 on success, error code otherwise.
68+
return rd_kafka_produce(
69+
topicHandle,
70+
message.partition.rawValue,
71+
RD_KAFKA_MSG_F_COPY,
72+
UnsafeMutableRawPointer(mutating: valueBuffer.baseAddress),
73+
valueBuffer.count,
74+
keyBytes,
75+
keyBytes?.count ?? 0,
76+
UnsafeMutableRawPointer(bitPattern: newMessageID)
77+
)
78+
}
79+
}
80+
81+
guard responseCode == 0 else {
82+
throw KafkaError.rdKafkaError(wrapping: rd_kafka_last_error())
83+
}
84+
}
85+
4586
/// Polls the Kafka client for events.
4687
///
4788
/// Events will cause application-provided callbacks to be called.
@@ -210,10 +251,19 @@ final class KafkaClient {
210251
}
211252
}
212253

254+
/// Returns `true` if the underlying `librdkafka` consumer is closed.
213255
var isConsumerClosed: Bool {
214256
rd_kafka_consumer_closed(self.kafkaHandle) == 1
215257
}
216258

259+
/// Returns the current out queue length.
260+
///
261+
/// This means the number of producer messages that wait to be sent + the number of any
262+
/// callbacks that are waiting to be executed by invoking `rd_kafka_poll`.
263+
var outgoingQueueSize: Int32 {
264+
return rd_kafka_outq_len(self.kafkaHandle)
265+
}
266+
217267
/// Scoped accessor that enables safe access to the pointer of the client's Kafka handle.
218268
/// - Warning: Do not escape the pointer from the closure for later use.
219269
/// - Parameter body: The closure will use the Kafka handle pointer.

Sources/SwiftKafka/KafkaConsumer.swift

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,23 +17,15 @@ import Logging
1717
import NIOConcurrencyHelpers
1818
import NIOCore
1919

20-
// MARK: - NoBackPressure
21-
22-
/// `NIOAsyncSequenceProducerBackPressureStrategy` that always returns true.
23-
struct NoBackPressure: NIOAsyncSequenceProducerBackPressureStrategy {
24-
func didYield(bufferDepth: Int) -> Bool { true }
25-
func didConsume(bufferDepth: Int) -> Bool { true }
26-
}
27-
28-
// MARK: - ShutDownOnTerminate
20+
// MARK: - KafkaConsumerShutDownOnTerminate
2921

3022
/// `NIOAsyncSequenceProducerDelegate` that terminates the shuts the consumer down when
3123
/// `didTerminate()` is invoked.
32-
struct ShutdownOnTerminate: @unchecked Sendable { // We can do that because our stored propery is protected by a lock
24+
internal struct KafkaConsumerShutdownOnTerminate: @unchecked Sendable { // We can do that because our stored propery is protected by a lock
3325
let stateMachine: NIOLockedValueBox<KafkaConsumer.StateMachine>
3426
}
3527

36-
extension ShutdownOnTerminate: NIOAsyncSequenceProducerDelegate {
28+
extension KafkaConsumerShutdownOnTerminate: NIOAsyncSequenceProducerDelegate {
3729
func produceMore() {
3830
// No back pressure
3931
return
@@ -68,7 +60,8 @@ extension ShutdownOnTerminate: NIOAsyncSequenceProducerDelegate {
6860
/// `AsyncSequence` implementation for handling messages received from the Kafka cluster (``KafkaConsumerMessage``).
6961
public struct KafkaConsumerMessages: AsyncSequence {
7062
public typealias Element = KafkaConsumerMessage
71-
typealias WrappedSequence = NIOAsyncSequenceProducer<Element, NoBackPressure, ShutdownOnTerminate>
63+
typealias BackPressureStrategy = NIOAsyncSequenceProducerBackPressureStrategies.NoBackPressure
64+
typealias WrappedSequence = NIOAsyncSequenceProducer<Element, BackPressureStrategy, KafkaConsumerShutdownOnTerminate>
7265
let wrappedSequence: WrappedSequence
7366

7467
/// `AsynceIteratorProtocol` implementation for handling messages received from the Kafka cluster (``KafkaConsumerMessage``).
@@ -91,8 +84,8 @@ public struct KafkaConsumerMessages: AsyncSequence {
9184
public final class KafkaConsumer {
9285
typealias Producer = NIOAsyncSequenceProducer<
9386
KafkaConsumerMessage,
94-
NoBackPressure,
95-
ShutdownOnTerminate
87+
NIOAsyncSequenceProducerBackPressureStrategies.NoBackPressure,
88+
KafkaConsumerShutdownOnTerminate
9689
>
9790
/// The configuration object of the consumer client.
9891
private var config: KafkaConsumerConfiguration
@@ -123,8 +116,8 @@ public final class KafkaConsumer {
123116

124117
let sourceAndSequence = NIOAsyncSequenceProducer.makeSequence(
125118
elementType: KafkaConsumerMessage.self,
126-
backPressureStrategy: NoBackPressure(),
127-
delegate: ShutdownOnTerminate(stateMachine: self.stateMachine)
119+
backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.NoBackPressure(),
120+
delegate: KafkaConsumerShutdownOnTerminate(stateMachine: self.stateMachine)
128121
)
129122

130123
self.messages = KafkaConsumerMessages(

0 commit comments

Comments
 (0)