X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2FGenericApplicationTests.java;h=7335770aa01b87082c1f1df860066878d44fddd6;hb=384ed293b9d6d71f68031f92bd7bf60670a7ba4b;hp=54137b44d5a3a66f9557a7e4d3123b3e9e061d0b;hpb=1c6d263c619010d23bf502c14dda45db11a2baf6;p=demos%2Fkafka%2Ftraining diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java index 54137b4..7335770 100644 --- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java +++ b/src/test/java/de/juplo/kafka/GenericApplicationTests.java @@ -66,7 +66,7 @@ abstract class GenericApplicationTests @Autowired MongoProperties mongoProperties; @Autowired - PollIntervalAwareConsumerRebalanceListener rebalanceListener; + CommittingConsumerRebalanceListener rebalanceListener; @Autowired RecordHandler recordHandler; @@ -91,7 +91,7 @@ abstract class GenericApplicationTests /** Tests methods */ @Test - void commitsCurrentOffsetsOnSuccess() + void commitsCurrentOffsetsOnSuccess() throws Exception { int numberOfGeneratedMessages = recordGenerator.generate(false, false, messageSender); @@ -114,6 +114,7 @@ abstract class GenericApplicationTests .isThrownBy(() -> endlessConsumer.exitStatus()) .describedAs("Consumer should still be running"); + endlessConsumer.stop(); recordGenerator.assertBusinessLogic(); } @@ -211,7 +212,7 @@ abstract class GenericApplicationTests Long expected = offsetsToCheck.get(tp) + 1; log.debug("Checking, if the offset {} for {} is at most {}", offset, tp, expected); assertThat(offset) - .describedAs("Committed offset corresponds to the offset of the consumer") + .describedAs("Committed offset must be at most equal to the offset of the consumer") .isLessThanOrEqualTo(expected); isOffsetBehindSeen.add(offset < expected); }); @@ -385,7 +386,6 @@ abstract class GenericApplicationTests { try { - endlessConsumer.stop(); testRecordProducer.close(); offsetConsumer.close(); }