From: Kai Moritz Date: Sun, 14 Aug 2022 10:06:01 +0000 (+0200) Subject: Tests aus gemerged springified-consumer--serialization -> deserialization X-Git-Tag: sumup-adder---lvm-2-tage~9^2~7 X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=657bf71b6c1c99065f26cccf0c3d2a1f30bc9407;hp=80f616369c011db99eddf42c6ee91e66fd1dfd07;p=demos%2Fkafka%2Ftraining Tests aus gemerged springified-consumer--serialization -> deserialization * Es wurde nur der hinzugefügte Test übernommen. * Der hinzugefügte Test wurde an das von Spring-Kafka abweichende Verhalten bei einem Logik-Fehler angepasst: Kafka führt nicht automatisch Seeks oder einene Commit durch. Da `EndlessConsumer` bei einem Logik-Fehler explizit ein `unsubscribe()` durchführt, wird kein Offset-Commit durchgefürt, so dass die alten Offset-Positionen gültig bleiben. * Der Test wurde entsprechend umbenannt. * `RecordGenerator` wurde um einen weiteren Integer-Set erweitert, über den die Indizes der zu erzeugenden Logik-Fehler gesetzt werden können. * Der hinzugefügte Test wurde auf die überarbeitete Methode zur Erzeugung der Test-Nachrichten umgestellt. * `ApplicationTest` wurde so ergänzt, dass der für den hinzugefügten Test benötigte Logik-Fehler erzeugt wird. --- diff --git a/src/test/java/de/juplo/kafka/ApplicationTest.java b/src/test/java/de/juplo/kafka/ApplicationTest.java index 81165ab..ed93a21 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTest.java +++ b/src/test/java/de/juplo/kafka/ApplicationTest.java @@ -1,14 +1,20 @@ package de.juplo.kafka; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Bytes; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Primary; +import org.springframework.test.context.ContextConfiguration; import java.util.Set; import java.util.function.Consumer; +@ContextConfiguration(classes = ApplicationTest.Configuration.class) public class ApplicationTest extends GenericApplicationTest { public ApplicationTest() @@ -24,6 +30,7 @@ public class ApplicationTest extends GenericApplicationTest public void generate( int numberOfMessagesToGenerate, Set poisonPills, + Set logicErrors, Consumer> messageSender) { int i = 0; @@ -35,10 +42,15 @@ public class ApplicationTest extends GenericApplicationTest if (++i > numberOfMessagesToGenerate) return; - Bytes value = - poisonPills.contains(i) - ? new Bytes(stringSerializer.serialize(TOPIC, "BOOM!")) - : new Bytes(longSerializer.serialize(TOPIC, (long)i)); + Bytes value = new Bytes(longSerializer.serialize(TOPIC, (long)i)); + if (logicErrors.contains(i)) + { + value = new Bytes(longSerializer.serialize(TOPIC, Long.MIN_VALUE)); + } + if (poisonPills.contains(i)) + { + value = new Bytes(stringSerializer.serialize(TOPIC, "BOOM (Poison-Pill)!")); + } ProducerRecord record = new ProducerRecord<>( @@ -53,4 +65,20 @@ public class ApplicationTest extends GenericApplicationTest } }); } + + + @TestConfiguration + public static class Configuration + { + @Primary + @Bean + public Consumer> consumer() + { + return (record) -> + { + if (record.value() == Long.MIN_VALUE) + throw new RuntimeException("BOOM (Logic-Error)!"); + }; + } + } } diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTest.java b/src/test/java/de/juplo/kafka/GenericApplicationTest.java index 68f150f..a6d6aa1 100644 --- a/src/test/java/de/juplo/kafka/GenericApplicationTest.java +++ b/src/test/java/de/juplo/kafka/GenericApplicationTest.java @@ -49,13 +49,14 @@ abstract class GenericApplicationTest @Autowired KafkaConsumer kafkaConsumer; @Autowired + Consumer> consumer; + @Autowired ApplicationProperties properties; @Autowired ExecutorService executor; KafkaProducer testRecordProducer; KafkaConsumer offsetConsumer; - Consumer> testHandler; EndlessConsumer endlessConsumer; Map oldOffsets; Map newOffsets; @@ -77,7 +78,7 @@ abstract class GenericApplicationTest @Test void commitsCurrentOffsetsOnSuccess() { - recordGenerator.generate(100, Set.of(), messageSender); + recordGenerator.generate(100, Set.of(), Set.of(), messageSender); await("100 records received") .atMost(Duration.ofSeconds(30)) @@ -101,7 +102,7 @@ abstract class GenericApplicationTest @Test void commitsOffsetOfErrorForReprocessingOnDeserializationError() { - recordGenerator.generate(100, Set.of(77), messageSender); + recordGenerator.generate(100, Set.of(77), Set.of(), messageSender); await("Consumer failed") .atMost(Duration.ofSeconds(30)) @@ -131,6 +132,39 @@ abstract class GenericApplicationTest .containsInstanceOf(RecordDeserializationException.class); } + @Test + void doesNotCommitOffsetsOnLogicError() + { + recordGenerator.generate(100, Set.of(), Set.of(77), messageSender); + + await("Consumer failed") + .atMost(Duration.ofSeconds(30)) + .pollInterval(Duration.ofSeconds(1)) + .until(() -> !endlessConsumer.running()); + + checkSeenOffsetsForProgress(); + compareToCommitedOffsets(oldOffsets); + + endlessConsumer.start(); + await("Consumer failed") + .atMost(Duration.ofSeconds(30)) + .pollInterval(Duration.ofSeconds(1)) + .until(() -> !endlessConsumer.running()); + + checkSeenOffsetsForProgress(); + compareToCommitedOffsets(oldOffsets); + assertThat(receivedRecords.size()) + .describedAs("Received not all sent events") + .isLessThan(100); + + assertThatNoException() + .describedAs("Consumer should not be running") + .isThrownBy(() -> endlessConsumer.exitStatus()); + assertThat(endlessConsumer.exitStatus()) + .describedAs("Consumer should have exited abnormally") + .containsInstanceOf(RuntimeException.class); + } + /** Helper methods for the verification of expectations */ @@ -204,7 +238,8 @@ abstract class GenericApplicationTest { void generate( int numberOfMessagesToGenerate, - Set poistionPills, + Set poisonPills, + Set logicErrors, Consumer> messageSender); } @@ -252,8 +287,6 @@ abstract class GenericApplicationTest props.put("value.deserializer", BytesDeserializer.class.getName()); offsetConsumer = new KafkaConsumer<>(props); - testHandler = record -> {} ; - seekToEnd(); oldOffsets = new HashMap<>(); @@ -273,7 +306,7 @@ abstract class GenericApplicationTest new TopicPartition(record.topic(), record.partition()), record.offset()); receivedRecords.add(record); - testHandler.accept(record); + consumer.accept(record); }; endlessConsumer = @@ -305,5 +338,7 @@ abstract class GenericApplicationTest @TestConfiguration @Import(ApplicationConfiguration.class) - public static class Configuration {} + public static class Configuration + { + } }