X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationTests.java;h=b5644b6ff9dce68844c10e4427b54bc5426e47b8;hb=refs%2Fheads%2Fspringified-consumer--serialization;hp=3bac537d6e387d5b07083776d8dc687387f2d8e2;hpb=be1b513f8bd7646f9ceb3a7ba90952641e3af125;p=demos%2Fkafka%2Ftraining diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 3bac537..b5644b6 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -15,11 +15,13 @@ import org.springframework.boot.test.context.ConfigDataApplicationContextInitial import org.springframework.boot.test.context.TestConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Import; +import org.springframework.kafka.support.serializer.JsonSerializer; import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.test.context.TestPropertySource; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; import java.time.Duration; +import java.time.LocalDateTime; import java.util.*; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -56,7 +58,7 @@ class ApplicationTests @Autowired KafkaProducer kafkaProducer; @Autowired - KafkaConsumer kafkaConsumer; + KafkaConsumer kafkaConsumer; @Autowired KafkaConsumer offsetConsumer; @Autowired @@ -64,22 +66,34 @@ class ApplicationTests @Autowired ExecutorService executor; - Consumer> testHandler; - EndlessConsumer endlessConsumer; + Consumer> testHandler; + EndlessConsumer endlessConsumer; Map oldOffsets; Map newOffsets; - Set> receivedRecords; + Set> receivedRecords; /** Tests methods */ @Test - void commitsCurrentOffsetsOnSuccess() throws ExecutionException, InterruptedException + void commitsCurrentOffsetsOnSuccess() { send100Messages((partition, key, counter) -> { - Bytes value = new Bytes(valueSerializer.serialize(TOPIC, counter)); - return new ProducerRecord<>(TOPIC, partition, key, value); + Bytes value; + String type; + + if (counter%3 != 0) + { + value = serializeClientMessage(key, counter); + type = "message"; + } + else { + value = serializeGreeting(key); + type = "greeting"; + } + + return toRecord(partition, key, value, Optional.of(type)); }); await("100 records received") @@ -102,14 +116,89 @@ class ApplicationTests } @Test - void commitsOffsetOfErrorForReprocessingOnDeserializationError() + void commitsOffsetOfErrorForReprocessingOnDeserializationErrorInvalidMessage() { send100Messages((partition, key, counter) -> { - Bytes value = counter == 77 - ? new Bytes(stringSerializer.serialize(TOPIC, "BOOM!")) - : new Bytes(valueSerializer.serialize(TOPIC, counter)); - return new ProducerRecord<>(TOPIC, partition, key, value); + Bytes value; + String type; + + if (counter == 77) + { + value = serializeFooMessage(key, counter); + type = null; + } + else + { + if (counter%3 != 0) + { + value = serializeClientMessage(key, counter); + type = "message"; + } + else { + value = serializeGreeting(key); + type = "greeting"; + } + } + + return toRecord(partition, key, value, Optional.ofNullable(type)); + }); + + await("Consumer failed") + .atMost(Duration.ofSeconds(30)) + .pollInterval(Duration.ofSeconds(1)) + .until(() -> !endlessConsumer.running()); + + checkSeenOffsetsForProgress(); + compareToCommitedOffsets(newOffsets); + + endlessConsumer.start(); + await("Consumer failed") + .atMost(Duration.ofSeconds(30)) + .pollInterval(Duration.ofSeconds(1)) + .until(() -> !endlessConsumer.running()); + + checkSeenOffsetsForProgress(); + compareToCommitedOffsets(newOffsets); + assertThat(receivedRecords.size()) + .describedAs("Received not all sent events") + .isLessThan(100); + + assertThatNoException() + .describedAs("Consumer should not be running") + .isThrownBy(() -> endlessConsumer.exitStatus()); + assertThat(endlessConsumer.exitStatus()) + .describedAs("Consumer should have exited abnormally") + .containsInstanceOf(RecordDeserializationException.class); + } + + @Test + void commitsOffsetOfErrorForReprocessingOnDeserializationErrorOnUnknownMessage() + { + send100Messages((partition, key, counter) -> + { + Bytes value; + String type; + + if (counter == 77) + { + value = serializeFooMessage(key, counter); + type = "foo"; + } + else + { + if (counter%3 != 0) + { + value = serializeClientMessage(key, counter); + type = "message"; + } + else { + value = serializeGreeting(key); + type = "greeting"; + } + } + + return toRecord(partition, key, value, Optional.of(type)); }); await("Consumer failed") @@ -211,12 +300,12 @@ class ApplicationTests public interface RecordGenerator { - public ProducerRecord generate(int partition, String key, long counter); + public ProducerRecord generate(int partition, String key, int counter); } void send100Messages(RecordGenerator recordGenerator) { - long i = 0; + int i = 0; for (int partition = 0; partition < 10; partition++) { @@ -249,6 +338,32 @@ class ApplicationTests } } + ProducerRecord toRecord(int partition, String key, Bytes value, Optional type) + { + ProducerRecord record = + new ProducerRecord<>(TOPIC, partition, key, value); + + type.ifPresent(typeId -> record.headers().add("__TypeId__", typeId.getBytes())); + return record; + } + + Bytes serializeClientMessage(String key, int value) + { + TestClientMessage message = new TestClientMessage(key, Integer.toString(value)); + return new Bytes(valueSerializer.serialize(TOPIC, message)); + } + + Bytes serializeGreeting(String key) + { + TestGreeting message = new TestGreeting(key, LocalDateTime.now()); + return new Bytes(valueSerializer.serialize(TOPIC, message)); + } + + Bytes serializeFooMessage(String key, int value) + { + TestFooMessage message = new TestFooMessage(key, (long)value); + return new Bytes(valueSerializer.serialize(TOPIC, message)); + } @BeforeEach public void init() @@ -267,7 +382,7 @@ class ApplicationTests newOffsets.put(tp, offset - 1); }); - Consumer> captureOffsetAndExecuteTestHandler = + Consumer> captureOffsetAndExecuteTestHandler = record -> { newOffsets.put( @@ -307,9 +422,9 @@ class ApplicationTests public static class Configuration { @Bean - Serializer serializer() + Serializer serializer() { - return new LongSerializer(); + return new JsonSerializer<>(); } @Bean