From 3f41296dae5c094a29f8a89cda2bccfb8bc93c0a Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Tue, 20 Apr 2021 22:50:26 +0200 Subject: [PATCH] WIP: Using assign instead of subscribe This clearifies, that the current implementation does not scale. Subscribe suggests, that one can run multiple instances of the service concurrently. But doing so would break this version of the implementation. TODO: Partitionen bestimmen... --- .../java/de/juplo/kafka/outbox/delivery/OutboxProducer.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxProducer.java b/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxProducer.java index 331374d..f7b0c7f 100644 --- a/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxProducer.java +++ b/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxProducer.java @@ -65,8 +65,6 @@ public class OutboxProducer props.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - KafkaConsumer consumer = new KafkaConsumer<>(props); - consumer.subscribe(Arrays.asList(this.topic)); List partitions = consumer.listTopics().get(this.topic); Set assignment = new HashSet<>(); for (PartitionInfo info : partitions) @@ -77,6 +75,9 @@ public class OutboxProducer LOG.info("Using topic {} with {} partitions", topic, partitions); + KafkaConsumer consumer = new KafkaConsumer<>(props); + consumer.assign(assignment); + this.watermarks = new Watermarks(partitions.size()); long[] currentOffsets = new long[partitions.size()]; -- 2.20.1