]> juplo.de Git - demos/kafka/outbox/commitdiff
WIP: Using assign instead of subscribe
authorKai Moritz <kai@juplo.de>
Tue, 20 Apr 2021 20:50:26 +0000 (22:50 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 16 May 2021 22:02:47 +0000 (00:02 +0200)
This clearifies, that the current implementation does not scale.
Subscribe suggests, that one can run multiple instances of the service
concurrently. But doing so would break this version of the implementation.

TODO: Partitionen bestimmen...

delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxProducer.java

index 331374df1d71727344107a4245f2472c4a6b7400..f7b0c7f4a356e638b260e4ac18220d789b051b82 100644 (file)
@@ -65,8 +65,6 @@ public class OutboxProducer
     props.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
     props.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
 
-    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
-    consumer.subscribe(Arrays.asList(this.topic));
     List<PartitionInfo> partitions = consumer.listTopics().get(this.topic);
     Set<TopicPartition> assignment = new HashSet<>();
     for (PartitionInfo info : partitions)
@@ -77,6 +75,9 @@ public class OutboxProducer
 
     LOG.info("Using topic {} with {} partitions", topic, partitions);
 
+    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
+    consumer.assign(assignment);
+
     this.watermarks = new Watermarks(partitions.size());
 
     long[] currentOffsets = new long[partitions.size()];