From: Kai Moritz Date: Sun, 5 Jun 2022 05:14:54 +0000 (+0200) Subject: Springify: Test für das lesen gemischter Nachrichten mit aller Fehler-Typen X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=b7fdd6da4eea8491f09e5239588935d094e9ad5a;p=demos%2Fkafka%2Ftraining Springify: Test für das lesen gemischter Nachrichten mit aller Fehler-Typen --- diff --git a/src/main/java/de/juplo/kafka/FooMessage.java b/src/main/java/de/juplo/kafka/FooMessage.java new file mode 100644 index 0000000..70c2544 --- /dev/null +++ b/src/main/java/de/juplo/kafka/FooMessage.java @@ -0,0 +1,11 @@ +package de.juplo.kafka; + +import lombok.Data; + + +@Data +public class FooMessage +{ + private String client; + private Long timestamp; +} diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index c3fbe45..3c97bc3 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -102,7 +102,6 @@ class ApplicationTests .isTrue(); } - @Test void mixedMessages() { @@ -128,6 +127,55 @@ class ApplicationTests .isTrue(); } + @Test + void unknownMessagesAndPoisonPillsForAllTypes() + { + send100Messages((key, counter) -> + { + switch((int) (counter%10)) + { + case 1: + return serializeString("BOOM!", "message"); + case 2: + return serializeString("BOOM!", "greeting"); + case 3: + return serializeString("BOOM!", "foo"); + case 4: + return serializeString("BOOM!", "bar"); + case 5: + return serializeAsFooMessage(key); + case 6: + case 7: + return serializeAsGreeting(key); + default: + return serializeAsClientMessage(key, counter); + } + }); + + await("100 records received") + .atMost(Duration.ofSeconds(30)) + .until(() -> received.size() == 50); + + await("Offsets committed") + .atMost(Duration.ofSeconds(10)) + .untilAsserted(() -> + { + // UNSCHÖN: + // Funktioniert nur, weil nach den Nachrichten, die einen + // Deserialisierungs-Fehler auslösen noch valide Nachrichten + // gelesen werden. + // GRUND: + // Der MessageHandler sieht die Offsets der Fehlerhaften + // Nachrichten nicht! + checkSeenOffsetsForProgress(); + compareToCommitedOffsets(newOffsets); + }); + + assertThat(endlessConsumer.isRunning()) + .describedAs("Consumer should still be running") + .isTrue(); + } + @Test void commitsCurrentOffsetsOnDeserializationError() { @@ -199,7 +247,7 @@ class ApplicationTests Long expected = offsetsToCheck.get(tp) + 1; log.debug("TEST: Comparing the expected offset of {} for {} to {}", expected, tp, offset); assertThat(offset) - .describedAs("Committed offset corresponds to the offset of the consumer") + .describedAs("Committed offset corresponds to the offset of the consumer for " + tp) .isEqualTo(expected); }); } @@ -302,6 +350,14 @@ class ApplicationTests return new BytesAndType(serialize(greeting), "greeting"); } + BytesAndType serializeAsFooMessage(Integer key) + { + FooMessage foo = new FooMessage(); + foo.setClient(key.toString()); + foo.setTimestamp(System.currentTimeMillis()); + return new BytesAndType(serialize(foo), "foo"); + } + BytesAndType serializeString(String message, String messageType) { return new BytesAndType(new Bytes(message.getBytes()), messageType);