X-Git-Url: https://juplo.de/gitweb/?p=demos%2Fkafka%2Foutbox;a=blobdiff_plain;f=delivery%2Fsrc%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Foutbox%2Fdelivery%2FOutboxProducer.java;h=09c8789a3ac25697f70cabcb146473e90d5e5ecd;hp=c08cae7d6756655e5dbbe55dce19c2c3ed92256d;hb=70ec0ac46ecafcd10cb4224c67439f9039067bba;hpb=fbd0ca0df5004d13a5e93cdb8373bafc60440c8b diff --git a/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxProducer.java b/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxProducer.java index c08cae7..09c8789 100644 --- a/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxProducer.java +++ b/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxProducer.java @@ -16,6 +16,9 @@ import org.springframework.stereotype.Component; import javax.annotation.PreDestroy; +import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.*; + @Component public class OutboxProducer @@ -36,15 +39,16 @@ public class OutboxProducer this.repository = repository; Properties props = new Properties(); - props.put("bootstrap.servers", properties.bootstrapServers); - props.put("key.serializer", StringSerializer.class.getName()); - props.put("value.serializer", StringSerializer.class.getName()); + props.put(BOOTSTRAP_SERVERS_CONFIG, properties.bootstrapServers); + props.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + props.put(VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + props.put(ENABLE_IDEMPOTENCE_CONFIG, true); this.producer = new KafkaProducer<>(props); this.topic = properties.topic; } - @Scheduled(fixedDelay = 500) + @Scheduled(fixedDelayString = "${de.juplo.kafka.outbox.interval}") public void poll() { List items;