X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=delivery%2Fsrc%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Foutbox%2Fdelivery%2FOutboxProducer.java;h=29d827acde5aed9bd9d07ce3276ac521759d721c;hb=724def8b418ad48dcc2838b01058cf12ccb50a55;hp=999883f97ffca8844582d280b2d3100254a1b041;hpb=ca44f2f7e67fb263660adf43f816e5ec98a5e567;p=demos%2Fkafka%2Foutbox diff --git a/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxProducer.java b/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxProducer.java index 999883f..29d827a 100644 --- a/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxProducer.java +++ b/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxProducer.java @@ -18,14 +18,14 @@ import org.springframework.scheduling.annotation.Scheduled; import javax.annotation.PreDestroy; import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG; -import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; -import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.*; public class OutboxProducer { final static Logger LOG = LoggerFactory.getLogger(OutboxProducer.class); + public final static String HEADER = "#"; private final OutboxRepository repository; private final KafkaProducer producer; @@ -48,6 +48,7 @@ public class OutboxProducer props.put(BOOTSTRAP_SERVERS_CONFIG, properties.bootstrapServers); props.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + props.put(ENABLE_IDEMPOTENCE_CONFIG, true); this.producer = new KafkaProducer<>(props); this.topic = properties.topic; @@ -87,7 +88,7 @@ public class OutboxProducer new ProducerRecord<>(topic, item.getKey(), item.getValue()); sequenceNumber = item.getSequenceNumber(); - record.headers().add("SEQ#", Longs.toByteArray(sequenceNumber)); + record.headers().add(HEADER, Longs.toByteArray(sequenceNumber)); producer.send(record, (metadata, e) -> {