diff --git a/Sources/Kafka/KafkaConsumer.swift b/Sources/Kafka/KafkaConsumer.swift index c5ec5226..b5914096 100644 --- a/Sources/Kafka/KafkaConsumer.swift +++ b/Sources/Kafka/KafkaConsumer.swift @@ -492,6 +492,28 @@ public final class KafkaConsumer: Sendable, Service { /// - message: Last received message that shall be marked as read. /// - Throws: A ``KafkaError`` if committing failed. public func scheduleCommit(_ message: KafkaConsumerMessage) throws { + try scheduleCommit( + topic: message.topic, + partition: message.partition, + offset: message.offset) + } + + /// Mark all messages up to the passed message in the topic as read. + /// Schedules a commit and returns immediately. + /// Any errors encountered after scheduling the commit will be discarded. + /// + /// This method is only used for manual offset management. + /// + /// - Warning: This method fails if the ``KafkaConsumerConfiguration/isAutoCommitEnabled`` configuration property is set to `true` (default). + /// + /// - Parameters: + /// - topic: Topic where the message that should be marked as read resides. + /// - partition: Partition where the message that should be marked as read resides. + /// - offset: Offset of the message that shall be marked as read. + /// - Throws: A ``KafkaError`` if committing failed. + public func scheduleCommit(topic: String, + partition: KafkaPartition, + offset: KafkaOffset) throws { let action = self.stateMachine.withLockedValue { $0.commit() } switch action { case .throwClosedError: @@ -500,8 +522,10 @@ public final class KafkaConsumer: Sendable, Service { guard self.configuration.isAutoCommitEnabled == false else { throw KafkaError.config(reason: "Committing manually only works if isAutoCommitEnabled set to false") } - - try client.scheduleCommit(message) + try client.scheduleCommit( + topic: topic, + partition: partition, + offset: offset) } } @@ -521,6 +545,26 @@ public final class KafkaConsumer: Sendable, Service { /// - message: Last received message that shall be marked as read. /// - Throws: A ``KafkaError`` if committing failed. public func commit(_ message: KafkaConsumerMessage) async throws { + try await commit(topic: message.topic, + partition: message.partition, + offset: message.offset) + } + + /// Mark all messages up to the passed message in the topic as read. + /// Awaits until the commit succeeds or an error is encountered. + /// + /// This method is only used for manual offset management. + /// + /// - Warning: This method fails if the ``KafkaConsumerConfiguration/isAutoCommitEnabled`` configuration property is set to `true` (default). + /// + /// - Parameters: + /// - topic: Topic where the message that should be marked as read resides. + /// - partition: Partition where the message that should be marked as read resides. + /// - offset: Offset of the message that shall be marked as read. + /// - Throws: A ``KafkaError`` if committing failed. + public func commit(topic: String, + partition: KafkaPartition, + offset: KafkaOffset) async throws { let action = self.stateMachine.withLockedValue { $0.commit() } switch action { case .throwClosedError: @@ -530,10 +574,12 @@ public final class KafkaConsumer: Sendable, Service { throw KafkaError.config(reason: "Committing manually only works if isAutoCommitEnabled set to false") } - try await client.commit(message) + try await client.commit(topic: topic, + partition: partition, + offset: offset) } } - + /// This function is used to gracefully shut down a Kafka consumer client. /// /// - Note: Invoking this function is not always needed as the ``KafkaConsumer`` diff --git a/Sources/Kafka/RDKafka/RDKafkaClient.swift b/Sources/Kafka/RDKafka/RDKafkaClient.swift index 014a3e8c..b9146286 100644 --- a/Sources/Kafka/RDKafka/RDKafkaClient.swift +++ b/Sources/Kafka/RDKafka/RDKafkaClient.swift @@ -526,41 +526,51 @@ final class RDKafkaClient: Sendable { } } - /// Non-blocking "fire-and-forget" commit of a `message`'s offset to Kafka. + /// Non-blocking "fire-and-forget" commit of a `topic`, `partition`, and `offset` to Kafka. /// Schedules a commit and returns immediately. /// Any errors encountered after scheduling the commit will be discarded. /// - /// - Parameter message: Last received message that shall be marked as read. + /// - Parameter topic: Topic to commit to + /// - Parameter partition: Partition to commit to + /// - Parameter offset: Offset to commit /// - Throws: A ``KafkaError`` if scheduling the commit failed. - func scheduleCommit(_ message: KafkaConsumerMessage) throws { - // The offset committed is always the offset of the next requested message. - // Thus, we increase the offset of the current message by one before committing it. - // See: https://github.com/edenhill/librdkafka/issues/2745#issuecomment-598067945 - let changesList = RDKafkaTopicPartitionList() - changesList.setOffset( - topic: message.topic, - partition: message.partition, - offset: Int64(message.offset.rawValue + 1) - ) - - let error = changesList.withListPointer { listPointer in - return rd_kafka_commit( - self.kafkaHandle, - listPointer, - 1 // async = true + func scheduleCommit( + topic: String, + partition: KafkaPartition, + offset: KafkaOffset) throws { + // The offset committed is always the offset of the next requested message. + // Thus, we increase the offset of the current message by one before committing it. + // See: https://github.com/edenhill/librdkafka/issues/2745#issuecomment-598067945 + let changesList = RDKafkaTopicPartitionList() + changesList.setOffset( + topic: topic, + partition: partition, + offset: Int64(offset.rawValue + 1) ) - } - if error != RD_KAFKA_RESP_ERR_NO_ERROR { - throw KafkaError.rdKafkaError(wrapping: error) - } + let error = changesList.withListPointer { listPointer in + return rd_kafka_commit( + self.kafkaHandle, + listPointer, + 1 // async = true + ) + } + + if error != RD_KAFKA_RESP_ERR_NO_ERROR { + throw KafkaError.rdKafkaError(wrapping: error) + } } /// Non-blocking **awaitable** commit of a `message`'s offset to Kafka. /// - /// - Parameter message: Last received message that shall be marked as read. + /// - Parameter topic: Topic to commit to + /// - Parameter partition: Partition to commit to + /// - Parameter offset: Offset to commit /// - Throws: A ``KafkaError`` if the commit failed. - func commit(_ message: KafkaConsumerMessage) async throws { + func commit( + topic: String, + partition: KafkaPartition, + offset: KafkaOffset) async throws { // Declare captured closure outside of withCheckedContinuation. // We do that because do an unretained pass of the captured closure to // librdkafka which means we have to keep a reference to the closure @@ -577,9 +587,9 @@ final class RDKafkaClient: Sendable { // See: https://github.com/edenhill/librdkafka/issues/2745#issuecomment-598067945 let changesList = RDKafkaTopicPartitionList() changesList.setOffset( - topic: message.topic, - partition: message.partition, - offset: Int64(message.offset.rawValue + 1) + topic: topic, + partition: partition, + offset: Int64(offset.rawValue + 1) ) // Unretained pass because the reference that librdkafka holds to capturedClosure @@ -597,7 +607,7 @@ final class RDKafkaClient: Sendable { } } } - + /// Flush any outstanding produce requests. /// /// - Parameters: