X-Git-Url: https://juplo.de/gitweb/?p=demos%2Fkafka%2Foutbox;a=blobdiff_plain;f=delivery%2Fsrc%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Foutbox%2Fdelivery%2FOutboxProducer.java;fp=delivery%2Fsrc%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Foutbox%2Fdelivery%2FOutboxProducer.java;h=f7b0c7f4a356e638b260e4ac18220d789b051b82;hp=331374df1d71727344107a4245f2472c4a6b7400;hb=3f41296dae5c094a29f8a89cda2bccfb8bc93c0a;hpb=a92d318043bf698dc2a949db2b76893c8abf03a1 diff --git a/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxProducer.java b/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxProducer.java index 331374d..f7b0c7f 100644 --- a/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxProducer.java +++ b/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxProducer.java @@ -65,8 +65,6 @@ public class OutboxProducer props.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - KafkaConsumer consumer = new KafkaConsumer<>(props); - consumer.subscribe(Arrays.asList(this.topic)); List partitions = consumer.listTopics().get(this.topic); Set assignment = new HashSet<>(); for (PartitionInfo info : partitions) @@ -77,6 +75,9 @@ public class OutboxProducer LOG.info("Using topic {} with {} partitions", topic, partitions); + KafkaConsumer consumer = new KafkaConsumer<>(props); + consumer.assign(assignment); + this.watermarks = new Watermarks(partitions.size()); long[] currentOffsets = new long[partitions.size()];