X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationTests.java;h=d7eb0398453a0229e15abacbc6d3c0903be842bd;hb=refs%2Fheads%2Frebalance-listener;hp=0909f2c838fe7fdaa14f3e6656bccc48b78179a0;hpb=487ca1ac0c73b1c07547d5316d145ed46490ba9d;p=demos%2Fkafka%2Ftraining diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 0909f2c..d7eb039 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -65,7 +65,9 @@ class ApplicationTests @Autowired ExecutorService executor; @Autowired - KeyCountingRecordHandler keyCountingRecordHandler; + ApplicationRebalanceListener rebalanceListener; + @Autowired + ApplicationRecordHandler recordHandler; EndlessConsumer endlessConsumer; Map oldOffsets; @@ -268,7 +270,7 @@ class ApplicationTests }); TestRecordHandler captureOffsetAndExecuteTestHandler = - new TestRecordHandler(keyCountingRecordHandler) { + new TestRecordHandler(recordHandler) { @Override public void onNewRecord(ConsumerRecord record) { @@ -285,6 +287,7 @@ class ApplicationTests properties.getClientId(), properties.getTopic(), kafkaConsumer, + rebalanceListener, captureOffsetAndExecuteTestHandler); endlessConsumer.start();