props.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
- KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
- consumer.subscribe(Arrays.asList(this.topic));
List<PartitionInfo> partitions = consumer.listTopics().get(this.topic);
Set<TopicPartition> assignment = new HashSet<>();
for (PartitionInfo info : partitions)
LOG.info("Using topic {} with {} partitions", topic, partitions);
+ KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
+ consumer.assign(assignment);
+
this.watermarks = new Watermarks(partitions.size());
long[] currentOffsets = new long[partitions.size()];