Skip to content

Commit

Permalink
Merge pull request #3 from galgus/auto_ack
Browse files Browse the repository at this point in the history
chore: set auto_ack to true
  • Loading branch information
lauverrec authored Sep 29, 2023
2 parents d2f3c8b + 5b1b7e4 commit ac7aed2
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 5 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
</parent>
<groupId>com.galgus.kafka.connect</groupId>
<artifactId>kafka-connect-rabbitmq</artifactId>
<version>0.2.0</version>
<version>0.3.0</version>
<name>kafka-connect-rabbitmq</name>
<description>A Kafka Connect connector reading and writing data from RabbitMQ.</description>
<url>https://github.com/galgus/kafka-connect-rabbitmq</url>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
//import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
Expand Down Expand Up @@ -93,7 +93,7 @@ public void start(Map<String, String> settings) {
for (String queue : config.queues) {
try {
log.info("Starting consumer");
this.channel.basicConsume(queue, consumer);
this.channel.basicConsume(queue, true, consumer);
log.info("Setting channel.basicQos({}, {});", config.prefetchCount, config.prefetchGlobal);
this.channel.basicQos(config.prefetchCount, config.prefetchGlobal);
} catch (IOException ex) {
Expand All @@ -103,15 +103,15 @@ public void start(Map<String, String> settings) {

}

@Override
/*@Override
public void commitRecord(SourceRecord record) {
Long deliveryTag = (Long) record.sourceOffset().get("deliveryTag");
try {
this.channel.basicAck(deliveryTag, false);
} catch (IOException e) {
throw new RetriableException(e);
}
}
}*/

@Override
public List<SourceRecord> poll() throws InterruptedException {
Expand Down

0 comments on commit ac7aed2

Please sign in to comment.