X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationTests.java;h=d7eb0398453a0229e15abacbc6d3c0903be842bd;hb=refs%2Fheads%2Frebalance-listener;hp=3bac537d6e387d5b07083776d8dc687387f2d8e2;hpb=be1b513f8bd7646f9ceb3a7ba90952641e3af125;p=demos%2Fkafka%2Ftraining diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 3bac537..d7eb039 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -11,6 +11,7 @@ import org.apache.kafka.common.serialization.*; import org.apache.kafka.common.utils.Bytes; import org.junit.jupiter.api.*; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer; import org.springframework.boot.test.context.TestConfiguration; import org.springframework.context.annotation.Bean; @@ -24,7 +25,6 @@ import java.util.*; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.function.BiConsumer; -import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -42,6 +42,7 @@ import static org.awaitility.Awaitility.*; "consumer.topic=" + TOPIC, "consumer.commit-interval=1s" }) @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS) +@EnableAutoConfiguration @Slf4j class ApplicationTests { @@ -63,8 +64,11 @@ class ApplicationTests ApplicationProperties properties; @Autowired ExecutorService executor; + @Autowired + ApplicationRebalanceListener rebalanceListener; + @Autowired + ApplicationRecordHandler recordHandler; - Consumer> testHandler; EndlessConsumer endlessConsumer; Map oldOffsets; Map newOffsets; @@ -253,8 +257,6 @@ class ApplicationTests @BeforeEach public void init() { - testHandler = record -> {} ; - seekToEnd(); oldOffsets = new HashMap<>(); @@ -267,14 +269,16 @@ class ApplicationTests newOffsets.put(tp, offset - 1); }); - Consumer> captureOffsetAndExecuteTestHandler = - record -> - { - newOffsets.put( - new TopicPartition(record.topic(), record.partition()), - record.offset()); - receivedRecords.add(record); - testHandler.accept(record); + TestRecordHandler captureOffsetAndExecuteTestHandler = + new TestRecordHandler(recordHandler) { + @Override + public void onNewRecord(ConsumerRecord record) + { + newOffsets.put( + new TopicPartition(record.topic(), record.partition()), + record.offset()); + receivedRecords.add(record); + } }; endlessConsumer = @@ -283,6 +287,7 @@ class ApplicationTests properties.getClientId(), properties.getTopic(), kafkaConsumer, + rebalanceListener, captureOffsetAndExecuteTestHandler); endlessConsumer.start();