Switched submodule-path to publicly available sources
[demos/kafka/outbox] / delivery / src / main / java / de / juplo / kafka / outbox / delivery / OutboxProducer.java
index ac7d4eb..29d827a 100644 (file)
@@ -17,11 +17,15 @@ import org.springframework.scheduling.annotation.Scheduled;
 
 import javax.annotation.PreDestroy;
 
+import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.*;
+
 
 public class OutboxProducer
 {
   final static Logger LOG = LoggerFactory.getLogger(OutboxProducer.class);
 
+  public final static String HEADER = "#";
 
   private final OutboxRepository repository;
   private final KafkaProducer<String, String> producer;
@@ -41,9 +45,10 @@ public class OutboxProducer
     this.repository = repository;
 
     Properties props = new Properties();
-    props.put("bootstrap.servers", properties.bootstrapServers);
-    props.put("key.serializer", StringSerializer.class.getName());
-    props.put("value.serializer", StringSerializer.class.getName());
+    props.put(BOOTSTRAP_SERVERS_CONFIG, properties.bootstrapServers);
+    props.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+    props.put(VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+    props.put(ENABLE_IDEMPOTENCE_CONFIG, true);
 
     this.producer = new KafkaProducer<>(props);
     this.topic = properties.topic;
@@ -83,7 +88,7 @@ public class OutboxProducer
         new ProducerRecord<>(topic, item.getKey(), item.getValue());
 
     sequenceNumber = item.getSequenceNumber();
-    record.headers().add("SEQ#", Longs.toByteArray(sequenceNumber));
+    record.headers().add(HEADER, Longs.toByteArray(sequenceNumber));
 
     producer.send(record, (metadata, e) ->
     {