X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationTests.java;fp=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationTests.java;h=7f666f69b5412c242164b97c4c08ec6dae59008e;hb=cfbe7dcd3318ee846cb5890eea4328e36c5aa364;hp=fc5d4c9e51d21348a9271fad7877bbd07b9ed0fe;hpb=2768e0f97c441ade5ce8ff371aa590fdc3cfd6c6;p=demos%2Fkafka%2Ftraining diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index fc5d4c9..7f666f6 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -64,14 +64,10 @@ class ApplicationTests @Autowired KafkaConsumer offsetConsumer; @Autowired - PartitionStatisticsRepository partitionStatisticsRepository; - @Autowired ApplicationProperties properties; @Autowired ExecutorService executor; @Autowired - PartitionStatisticsRepository repository; - @Autowired KeyCountingRebalanceListener keyCountingRebalanceListener; @Autowired KeyCountingRecordHandler keyCountingRecordHandler; @@ -191,30 +187,24 @@ class ApplicationTests void seekToEnd() { offsetConsumer.assign(partitions()); + offsetConsumer.seekToEnd(partitions()); partitions().forEach(tp -> { + // seekToEnd() works lazily: it only takes effect on poll()/position() Long offset = offsetConsumer.position(tp); log.info("New position for {}: {}", tp, offset); - Integer partition = tp.partition(); - StatisticsDocument document = - partitionStatisticsRepository - .findById(partition.toString()) - .orElse(new StatisticsDocument(partition)); - document.offset = offset; - partitionStatisticsRepository.save(document); }); + // The new positions must be commited! + offsetConsumer.commitSync(); offsetConsumer.unsubscribe(); } void doForCurrentOffsets(BiConsumer consumer) { - partitions().forEach(tp -> - { - String partition = Integer.toString(tp.partition()); - Optional offset = partitionStatisticsRepository.findById(partition).map(document -> document.offset); - consumer.accept(tp, offset.orElse(0l)); - }); - } + offsetConsumer.assign(partitions()); + partitions().forEach(tp -> consumer.accept(tp, offsetConsumer.position(tp))); + offsetConsumer.unsubscribe(); + } List partitions() { @@ -348,8 +338,7 @@ class ApplicationTests Properties props = new Properties(); props.put("bootstrap.servers", properties.getBootstrapServer()); props.put("client.id", "OFFSET-CONSUMER"); - props.put("enable.auto.commit", false); - props.put("auto.offset.reset", "latest"); + props.put("group.id", properties.getGroupId()); props.put("key.deserializer", BytesDeserializer.class.getName()); props.put("value.deserializer", BytesDeserializer.class.getName());