X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2FGenericApplicationTest.java;h=a6d6aa1df775355e7fd524b3a228909c657f79c1;hb=657bf71b6c1c99065f26cccf0c3d2a1f30bc9407;hp=68f150f4a8827921912a70068a6633fd67b286ce;hpb=80f616369c011db99eddf42c6ee91e66fd1dfd07;p=demos%2Fkafka%2Ftraining diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTest.java b/src/test/java/de/juplo/kafka/GenericApplicationTest.java index 68f150f..a6d6aa1 100644 --- a/src/test/java/de/juplo/kafka/GenericApplicationTest.java +++ b/src/test/java/de/juplo/kafka/GenericApplicationTest.java @@ -49,13 +49,14 @@ abstract class GenericApplicationTest @Autowired KafkaConsumer kafkaConsumer; @Autowired + Consumer> consumer; + @Autowired ApplicationProperties properties; @Autowired ExecutorService executor; KafkaProducer testRecordProducer; KafkaConsumer offsetConsumer; - Consumer> testHandler; EndlessConsumer endlessConsumer; Map oldOffsets; Map newOffsets; @@ -77,7 +78,7 @@ abstract class GenericApplicationTest @Test void commitsCurrentOffsetsOnSuccess() { - recordGenerator.generate(100, Set.of(), messageSender); + recordGenerator.generate(100, Set.of(), Set.of(), messageSender); await("100 records received") .atMost(Duration.ofSeconds(30)) @@ -101,7 +102,7 @@ abstract class GenericApplicationTest @Test void commitsOffsetOfErrorForReprocessingOnDeserializationError() { - recordGenerator.generate(100, Set.of(77), messageSender); + recordGenerator.generate(100, Set.of(77), Set.of(), messageSender); await("Consumer failed") .atMost(Duration.ofSeconds(30)) @@ -131,6 +132,39 @@ abstract class GenericApplicationTest .containsInstanceOf(RecordDeserializationException.class); } + @Test + void doesNotCommitOffsetsOnLogicError() + { + recordGenerator.generate(100, Set.of(), Set.of(77), messageSender); + + await("Consumer failed") + .atMost(Duration.ofSeconds(30)) + .pollInterval(Duration.ofSeconds(1)) + .until(() -> !endlessConsumer.running()); + + checkSeenOffsetsForProgress(); + compareToCommitedOffsets(oldOffsets); + + endlessConsumer.start(); + await("Consumer failed") + .atMost(Duration.ofSeconds(30)) + .pollInterval(Duration.ofSeconds(1)) + .until(() -> !endlessConsumer.running()); + + checkSeenOffsetsForProgress(); + compareToCommitedOffsets(oldOffsets); + assertThat(receivedRecords.size()) + .describedAs("Received not all sent events") + .isLessThan(100); + + assertThatNoException() + .describedAs("Consumer should not be running") + .isThrownBy(() -> endlessConsumer.exitStatus()); + assertThat(endlessConsumer.exitStatus()) + .describedAs("Consumer should have exited abnormally") + .containsInstanceOf(RuntimeException.class); + } + /** Helper methods for the verification of expectations */ @@ -204,7 +238,8 @@ abstract class GenericApplicationTest { void generate( int numberOfMessagesToGenerate, - Set poistionPills, + Set poisonPills, + Set logicErrors, Consumer> messageSender); } @@ -252,8 +287,6 @@ abstract class GenericApplicationTest props.put("value.deserializer", BytesDeserializer.class.getName()); offsetConsumer = new KafkaConsumer<>(props); - testHandler = record -> {} ; - seekToEnd(); oldOffsets = new HashMap<>(); @@ -273,7 +306,7 @@ abstract class GenericApplicationTest new TopicPartition(record.topic(), record.partition()), record.offset()); receivedRecords.add(record); - testHandler.accept(record); + consumer.accept(record); }; endlessConsumer = @@ -305,5 +338,7 @@ abstract class GenericApplicationTest @TestConfiguration @Import(ApplicationConfiguration.class) - public static class Configuration {} + public static class Configuration + { + } }