X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationTests.java;h=f19bfb157cc0a942a742a736b1d718ca33b89f3c;hb=06d7dacb51009e77147cd097adf0ea602bd82dd0;hp=aa3dfd64efd1abcb923c3891745e9d2ea2354f44;hpb=eb0ead4c178df50cdaf2197cf4e63a1fe709e852;p=demos%2Fkafka%2Ftraining diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index aa3dfd6..f19bfb1 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -71,9 +71,9 @@ class ApplicationTests @Autowired PartitionStatisticsRepository repository; @Autowired - WordcountRebalanceListener wordcountRebalanceListener; + SumRebalanceListener sumRebalanceListener; @Autowired - WordcountRecordHandler wordcountRecordHandler; + SumRecordHandler sumRecordHandler; EndlessConsumer endlessConsumer; Map oldOffsets; @@ -156,10 +156,10 @@ class ApplicationTests Long offset = offsetConsumer.position(tp); log.info("New position for {}: {}", tp, offset); Integer partition = tp.partition(); - StatisticsDocument document = + StateDocument document = partitionStatisticsRepository .findById(partition.toString()) - .orElse(new StatisticsDocument(partition)); + .orElse(new StateDocument(partition)); document.offset = offset; partitionStatisticsRepository.save(document); }); @@ -243,7 +243,7 @@ class ApplicationTests }); TestRecordHandler captureOffsetAndExecuteTestHandler = - new TestRecordHandler(wordcountRecordHandler) { + new TestRecordHandler(sumRecordHandler) { @Override public void onNewRecord(ConsumerRecord record) { @@ -260,7 +260,7 @@ class ApplicationTests properties.getClientId(), properties.getTopic(), kafkaConsumer, - wordcountRebalanceListener, + sumRebalanceListener, captureOffsetAndExecuteTestHandler); endlessConsumer.start();