X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationTests.java;h=d7eb0398453a0229e15abacbc6d3c0903be842bd;hb=refs%2Fheads%2Frebalance-listener;hp=26a34e406639f652c53878dcd8db12d7174598fd;hpb=c446512ec3bfa29e5e8482074cb6daf7e2ee1b2f;p=demos%2Fkafka%2Ftraining diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 26a34e4..d7eb039 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -11,6 +11,7 @@ import org.apache.kafka.common.serialization.*; import org.apache.kafka.common.utils.Bytes; import org.junit.jupiter.api.*; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer; import org.springframework.boot.test.context.TestConfiguration; import org.springframework.context.annotation.Bean; @@ -24,7 +25,6 @@ import java.util.*; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.function.BiConsumer; -import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -40,8 +40,9 @@ import static org.awaitility.Awaitility.*; properties = { "consumer.bootstrap-server=${spring.embedded.kafka.brokers}", "consumer.topic=" + TOPIC, - "consumer.commit-interval=100ms" }) + "consumer.commit-interval=1s" }) @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS) +@EnableAutoConfiguration @Slf4j class ApplicationTests { @@ -63,8 +64,11 @@ class ApplicationTests ApplicationProperties properties; @Autowired ExecutorService executor; + @Autowired + ApplicationRebalanceListener rebalanceListener; + @Autowired + ApplicationRecordHandler recordHandler; - Consumer> testHandler; EndlessConsumer endlessConsumer; Map oldOffsets; Map newOffsets; @@ -84,10 +88,12 @@ class ApplicationTests await("100 records received") .atMost(Duration.ofSeconds(30)) + .pollInterval(Duration.ofSeconds(1)) .until(() -> receivedRecords.size() >= 100); await("Offsets committed") .atMost(Duration.ofSeconds(10)) + .pollInterval(Duration.ofSeconds(1)) .untilAsserted(() -> { checkSeenOffsetsForProgress(); @@ -112,6 +118,7 @@ class ApplicationTests await("Consumer failed") .atMost(Duration.ofSeconds(30)) + .pollInterval(Duration.ofSeconds(1)) .until(() -> !endlessConsumer.running()); checkSeenOffsetsForProgress(); @@ -120,6 +127,7 @@ class ApplicationTests endlessConsumer.start(); await("Consumer failed") .atMost(Duration.ofSeconds(30)) + .pollInterval(Duration.ofSeconds(1)) .until(() -> !endlessConsumer.running()); checkSeenOffsetsForProgress(); @@ -249,8 +257,6 @@ class ApplicationTests @BeforeEach public void init() { - testHandler = record -> {} ; - seekToEnd(); oldOffsets = new HashMap<>(); @@ -263,14 +269,16 @@ class ApplicationTests newOffsets.put(tp, offset - 1); }); - Consumer> captureOffsetAndExecuteTestHandler = - record -> - { - newOffsets.put( - new TopicPartition(record.topic(), record.partition()), - record.offset()); - receivedRecords.add(record); - testHandler.accept(record); + TestRecordHandler captureOffsetAndExecuteTestHandler = + new TestRecordHandler(recordHandler) { + @Override + public void onNewRecord(ConsumerRecord record) + { + newOffsets.put( + new TopicPartition(record.topic(), record.partition()), + record.offset()); + receivedRecords.add(record); + } }; endlessConsumer = @@ -279,6 +287,7 @@ class ApplicationTests properties.getClientId(), properties.getTopic(), kafkaConsumer, + rebalanceListener, captureOffsetAndExecuteTestHandler); endlessConsumer.start();