X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationTests.java;h=d7eb0398453a0229e15abacbc6d3c0903be842bd;hb=refs%2Fheads%2Frebalance-listener;hp=fc5d4c9e51d21348a9271fad7877bbd07b9ed0fe;hpb=2da45caa1f9d32e3a5506d71cce7f06fa2e36523;p=demos%2Fkafka%2Ftraining diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index fc5d4c9..d7eb039 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -12,7 +12,6 @@ import org.apache.kafka.common.utils.Bytes; import org.junit.jupiter.api.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; -import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo; import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer; import org.springframework.boot.test.context.TestConfiguration; import org.springframework.context.annotation.Bean; @@ -41,11 +40,9 @@ import static org.awaitility.Awaitility.*; properties = { "consumer.bootstrap-server=${spring.embedded.kafka.brokers}", "consumer.topic=" + TOPIC, - "consumer.commit-interval=1s", - "spring.mongodb.embedded.version=4.4.13" }) + "consumer.commit-interval=1s" }) @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS) @EnableAutoConfiguration -@AutoConfigureDataMongo @Slf4j class ApplicationTests { @@ -64,17 +61,13 @@ class ApplicationTests @Autowired KafkaConsumer offsetConsumer; @Autowired - PartitionStatisticsRepository partitionStatisticsRepository; - @Autowired ApplicationProperties properties; @Autowired ExecutorService executor; @Autowired - PartitionStatisticsRepository repository; - @Autowired - KeyCountingRebalanceListener keyCountingRebalanceListener; + ApplicationRebalanceListener rebalanceListener; @Autowired - KeyCountingRecordHandler keyCountingRecordHandler; + ApplicationRecordHandler recordHandler; EndlessConsumer endlessConsumer; Map oldOffsets; @@ -191,30 +184,24 @@ class ApplicationTests void seekToEnd() { offsetConsumer.assign(partitions()); + offsetConsumer.seekToEnd(partitions()); partitions().forEach(tp -> { + // seekToEnd() works lazily: it only takes effect on poll()/position() Long offset = offsetConsumer.position(tp); log.info("New position for {}: {}", tp, offset); - Integer partition = tp.partition(); - StatisticsDocument document = - partitionStatisticsRepository - .findById(partition.toString()) - .orElse(new StatisticsDocument(partition)); - document.offset = offset; - partitionStatisticsRepository.save(document); }); + // The new positions must be commited! + offsetConsumer.commitSync(); offsetConsumer.unsubscribe(); } void doForCurrentOffsets(BiConsumer consumer) { - partitions().forEach(tp -> - { - String partition = Integer.toString(tp.partition()); - Optional offset = partitionStatisticsRepository.findById(partition).map(document -> document.offset); - consumer.accept(tp, offset.orElse(0l)); - }); - } + offsetConsumer.assign(partitions()); + partitions().forEach(tp -> consumer.accept(tp, offsetConsumer.position(tp))); + offsetConsumer.unsubscribe(); + } List partitions() { @@ -283,7 +270,7 @@ class ApplicationTests }); TestRecordHandler captureOffsetAndExecuteTestHandler = - new TestRecordHandler(keyCountingRecordHandler) { + new TestRecordHandler(recordHandler) { @Override public void onNewRecord(ConsumerRecord record) { @@ -300,7 +287,7 @@ class ApplicationTests properties.getClientId(), properties.getTopic(), kafkaConsumer, - keyCountingRebalanceListener, + rebalanceListener, captureOffsetAndExecuteTestHandler); endlessConsumer.start(); @@ -348,8 +335,7 @@ class ApplicationTests Properties props = new Properties(); props.put("bootstrap.servers", properties.getBootstrapServer()); props.put("client.id", "OFFSET-CONSUMER"); - props.put("enable.auto.commit", false); - props.put("auto.offset.reset", "latest"); + props.put("group.id", properties.getGroupId()); props.put("key.deserializer", BytesDeserializer.class.getName()); props.put("value.deserializer", BytesDeserializer.class.getName());