- send100Messages(counter ->
- counter == 77
- ? new Bytes(stringSerializer.serialize(TOPIC, "BOOM!"))
- : new Bytes(longSerializer.serialize(TOPIC, counter)));
+ offsetConsumer.assign(partitions());
+ partitions().forEach(tp ->
+ {
+ Long offset = offsetConsumer.position(tp);
+ log.info("New position for {}: {}", tp, offset);
+ Integer partition = tp.partition();
+ StateDocument document =
+ partitionStatisticsRepository
+ .findById(partition.toString())
+ .orElse(new StateDocument(partition));
+ document.offset = offset;
+ partitionStatisticsRepository.save(document);
+ });
+ offsetConsumer.unsubscribe();
+ }