WIP: Using assign instead of subscribe
[demos/kafka/outbox] / delivery / src / main / java / de / juplo / kafka / outbox / delivery / OutboxProducer.java
index 331374d..f7b0c7f 100644 (file)
@@ -65,8 +65,6 @@ public class OutboxProducer
     props.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
     props.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
 
-    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
-    consumer.subscribe(Arrays.asList(this.topic));
     List<PartitionInfo> partitions = consumer.listTopics().get(this.topic);
     Set<TopicPartition> assignment = new HashSet<>();
     for (PartitionInfo info : partitions)
@@ -77,6 +75,9 @@ public class OutboxProducer
 
     LOG.info("Using topic {} with {} partitions", topic, partitions);
 
+    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
+    consumer.assign(assignment);
+
     this.watermarks = new Watermarks(partitions.size());
 
     long[] currentOffsets = new long[partitions.size()];