From: Kai Moritz Date: Tue, 20 Apr 2021 20:50:26 +0000 (+0200) Subject: WIP: Using assign instead of subscribe X-Git-Url: https://juplo.de/gitweb/?p=demos%2Fkafka%2Foutbox;a=commitdiff_plain;h=3f41296dae5c094a29f8a89cda2bccfb8bc93c0a WIP: Using assign instead of subscribe This clearifies, that the current implementation does not scale. Subscribe suggests, that one can run multiple instances of the service concurrently. But doing so would break this version of the implementation. TODO: Partitionen bestimmen... --- diff --git a/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxProducer.java b/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxProducer.java index 331374d..f7b0c7f 100644 --- a/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxProducer.java +++ b/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxProducer.java @@ -65,8 +65,6 @@ public class OutboxProducer props.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - KafkaConsumer consumer = new KafkaConsumer<>(props); - consumer.subscribe(Arrays.asList(this.topic)); List partitions = consumer.listTopics().get(this.topic); Set assignment = new HashSet<>(); for (PartitionInfo info : partitions) @@ -77,6 +75,9 @@ public class OutboxProducer LOG.info("Using topic {} with {} partitions", topic, partitions); + KafkaConsumer consumer = new KafkaConsumer<>(props); + consumer.assign(assignment); + this.watermarks = new Watermarks(partitions.size()); long[] currentOffsets = new long[partitions.size()];