X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationTests.java;fp=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationTests.java;h=1f18e59e1174a52e7b801fb940770ba2cc2a5cff;hb=4abc82d5d4cc80feabee91a10749b40f9bcfd879;hp=fc5d4c9e51d21348a9271fad7877bbd07b9ed0fe;hpb=2768e0f97c441ade5ce8ff371aa590fdc3cfd6c6;p=demos%2Fkafka%2Ftraining diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index fc5d4c9..1f18e59 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -64,17 +64,15 @@ class ApplicationTests @Autowired KafkaConsumer offsetConsumer; @Autowired - PartitionStatisticsRepository partitionStatisticsRepository; - @Autowired ApplicationProperties properties; @Autowired ExecutorService executor; @Autowired - PartitionStatisticsRepository repository; + StateRepository stateRepository; @Autowired - KeyCountingRebalanceListener keyCountingRebalanceListener; + ApplicationRebalanceListener rebalanceListener; @Autowired - KeyCountingRecordHandler keyCountingRecordHandler; + ApplicationRecordHandler recordHandler; EndlessConsumer endlessConsumer; Map oldOffsets; @@ -196,12 +194,12 @@ class ApplicationTests Long offset = offsetConsumer.position(tp); log.info("New position for {}: {}", tp, offset); Integer partition = tp.partition(); - StatisticsDocument document = - partitionStatisticsRepository + StateDocument document = + stateRepository .findById(partition.toString()) - .orElse(new StatisticsDocument(partition)); + .orElse(new StateDocument(partition)); document.offset = offset; - partitionStatisticsRepository.save(document); + stateRepository.save(document); }); offsetConsumer.unsubscribe(); } @@ -211,7 +209,7 @@ class ApplicationTests partitions().forEach(tp -> { String partition = Integer.toString(tp.partition()); - Optional offset = partitionStatisticsRepository.findById(partition).map(document -> document.offset); + Optional offset = stateRepository.findById(partition).map(document -> document.offset); consumer.accept(tp, offset.orElse(0l)); }); } @@ -283,7 +281,7 @@ class ApplicationTests }); TestRecordHandler captureOffsetAndExecuteTestHandler = - new TestRecordHandler(keyCountingRecordHandler) { + new TestRecordHandler(recordHandler) { @Override public void onNewRecord(ConsumerRecord record) { @@ -300,7 +298,7 @@ class ApplicationTests properties.getClientId(), properties.getTopic(), kafkaConsumer, - keyCountingRebalanceListener, + rebalanceListener, captureOffsetAndExecuteTestHandler); endlessConsumer.start();