WIP: Using assign instead of subscribe
authorKai Moritz <kai@juplo.de>
Tue, 20 Apr 2021 20:50:26 +0000 (22:50 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 16 May 2021 22:02:47 +0000 (00:02 +0200)
This clearifies, that the current implementation does not scale.
Subscribe suggests, that one can run multiple instances of the service
concurrently. But doing so would break this version of the implementation.

TODO: Partitionen bestimmen...

delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxProducer.java

index 331374d..f7b0c7f 100644 (file)
@@ -65,8 +65,6 @@ public class OutboxProducer
     props.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
     props.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
 
-    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
-    consumer.subscribe(Arrays.asList(this.topic));
     List<PartitionInfo> partitions = consumer.listTopics().get(this.topic);
     Set<TopicPartition> assignment = new HashSet<>();
     for (PartitionInfo info : partitions)
@@ -77,6 +75,9 @@ public class OutboxProducer
 
     LOG.info("Using topic {} with {} partitions", topic, partitions);
 
+    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
+    consumer.assign(assignment);
+
     this.watermarks = new Watermarks(partitions.size());
 
     long[] currentOffsets = new long[partitions.size()];