From b7fdd6da4eea8491f09e5239588935d094e9ad5a Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 5 Jun 2022 07:14:54 +0200 Subject: [PATCH] =?utf8?q?Springify:=20Test=20f=C3=BCr=20das=20lesen=20gem?= =?utf8?q?ischter=20Nachrichten=20mit=20aller=20Fehler-Typen?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- src/main/java/de/juplo/kafka/FooMessage.java | 11 ++++ .../java/de/juplo/kafka/ApplicationTests.java | 60 ++++++++++++++++++- 2 files changed, 69 insertions(+), 2 deletions(-) create mode 100644 src/main/java/de/juplo/kafka/FooMessage.java diff --git a/src/main/java/de/juplo/kafka/FooMessage.java b/src/main/java/de/juplo/kafka/FooMessage.java new file mode 100644 index 0000000..70c2544 --- /dev/null +++ b/src/main/java/de/juplo/kafka/FooMessage.java @@ -0,0 +1,11 @@ +package de.juplo.kafka; + +import lombok.Data; + + +@Data +public class FooMessage +{ + private String client; + private Long timestamp; +} diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index c3fbe45..3c97bc3 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -102,7 +102,6 @@ class ApplicationTests .isTrue(); } - @Test void mixedMessages() { @@ -128,6 +127,55 @@ class ApplicationTests .isTrue(); } + @Test + void unknownMessagesAndPoisonPillsForAllTypes() + { + send100Messages((key, counter) -> + { + switch((int) (counter%10)) + { + case 1: + return serializeString("BOOM!", "message"); + case 2: + return serializeString("BOOM!", "greeting"); + case 3: + return serializeString("BOOM!", "foo"); + case 4: + return serializeString("BOOM!", "bar"); + case 5: + return serializeAsFooMessage(key); + case 6: + case 7: + return serializeAsGreeting(key); + default: + return serializeAsClientMessage(key, counter); + } + }); + + await("100 records received") + .atMost(Duration.ofSeconds(30)) + .until(() -> received.size() == 50); + + await("Offsets committed") + .atMost(Duration.ofSeconds(10)) + .untilAsserted(() -> + { + // UNSCHÖN: + // Funktioniert nur, weil nach den Nachrichten, die einen + // Deserialisierungs-Fehler auslösen noch valide Nachrichten + // gelesen werden. + // GRUND: + // Der MessageHandler sieht die Offsets der Fehlerhaften + // Nachrichten nicht! + checkSeenOffsetsForProgress(); + compareToCommitedOffsets(newOffsets); + }); + + assertThat(endlessConsumer.isRunning()) + .describedAs("Consumer should still be running") + .isTrue(); + } + @Test void commitsCurrentOffsetsOnDeserializationError() { @@ -199,7 +247,7 @@ class ApplicationTests Long expected = offsetsToCheck.get(tp) + 1; log.debug("TEST: Comparing the expected offset of {} for {} to {}", expected, tp, offset); assertThat(offset) - .describedAs("Committed offset corresponds to the offset of the consumer") + .describedAs("Committed offset corresponds to the offset of the consumer for " + tp) .isEqualTo(expected); }); } @@ -302,6 +350,14 @@ class ApplicationTests return new BytesAndType(serialize(greeting), "greeting"); } + BytesAndType serializeAsFooMessage(Integer key) + { + FooMessage foo = new FooMessage(); + foo.setClient(key.toString()); + foo.setTimestamp(System.currentTimeMillis()); + return new BytesAndType(serialize(foo), "foo"); + } + BytesAndType serializeString(String message, String messageType) { return new BytesAndType(new Bytes(message.getBytes()), messageType); -- 2.20.1