summary |
shortlog |
log |
commit | commitdiff |
tree
raw |
patch |
inline | side by side (from parent 1:
a92d318)
This clearifies, that the current implementation does not scale.
Subscribe suggests, that one can run multiple instances of the service
concurrently. But doing so would break this version of the implementation.
TODO: Partitionen bestimmen...
props.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
- KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
- consumer.subscribe(Arrays.asList(this.topic));
List<PartitionInfo> partitions = consumer.listTopics().get(this.topic);
Set<TopicPartition> assignment = new HashSet<>();
for (PartitionInfo info : partitions)
List<PartitionInfo> partitions = consumer.listTopics().get(this.topic);
Set<TopicPartition> assignment = new HashSet<>();
for (PartitionInfo info : partitions)
LOG.info("Using topic {} with {} partitions", topic, partitions);
LOG.info("Using topic {} with {} partitions", topic, partitions);
+ KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
+ consumer.assign(assignment);
+
this.watermarks = new Watermarks(partitions.size());
long[] currentOffsets = new long[partitions.size()];
this.watermarks = new Watermarks(partitions.size());
long[] currentOffsets = new long[partitions.size()];