Enabled idempotence for the producer
authorKai Moritz <kai@juplo.de>
Sat, 30 Jan 2021 15:45:34 +0000 (16:45 +0100)
committerKai Moritz <kai@juplo.de>
Sun, 16 May 2021 21:28:03 +0000 (23:28 +0200)
delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxProducer.java

index 49f777f..09c8789 100644 (file)
@@ -17,8 +17,7 @@ import org.springframework.stereotype.Component;
 import javax.annotation.PreDestroy;
 
 import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
 import javax.annotation.PreDestroy;
 
 import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
-import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
-import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.*;
 
 
 @Component
 
 
 @Component
@@ -43,6 +42,7 @@ public class OutboxProducer
     props.put(BOOTSTRAP_SERVERS_CONFIG, properties.bootstrapServers);
     props.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
     props.put(VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
     props.put(BOOTSTRAP_SERVERS_CONFIG, properties.bootstrapServers);
     props.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
     props.put(VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+    props.put(ENABLE_IDEMPOTENCE_CONFIG, true);
 
     this.producer = new KafkaProducer<>(props);
     this.topic = properties.topic;
 
     this.producer = new KafkaProducer<>(props);
     this.topic = properties.topic;