From: Kai Moritz <kai@juplo.de>
Date: Sun, 5 Jun 2022 05:14:54 +0000 (+0200)
Subject: Springify: Test für das lesen gemischter Nachrichten mit aller Fehler-Typen
X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=b7fdd6da4eea8491f09e5239588935d094e9ad5a;p=demos%2Fkafka%2Ftraining

Springify: Test für das lesen gemischter Nachrichten mit aller Fehler-Typen
---

diff --git a/src/main/java/de/juplo/kafka/FooMessage.java b/src/main/java/de/juplo/kafka/FooMessage.java
new file mode 100644
index 00000000..70c2544c
--- /dev/null
+++ b/src/main/java/de/juplo/kafka/FooMessage.java
@@ -0,0 +1,11 @@
+package de.juplo.kafka;
+
+import lombok.Data;
+
+
+@Data
+public class FooMessage
+{
+  private String client;
+  private Long timestamp;
+}
diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java
index c3fbe457..3c97bc38 100644
--- a/src/test/java/de/juplo/kafka/ApplicationTests.java
+++ b/src/test/java/de/juplo/kafka/ApplicationTests.java
@@ -102,7 +102,6 @@ class ApplicationTests
 				.isTrue();
 	}
 
-
 	@Test
 	void mixedMessages()
 	{
@@ -128,6 +127,55 @@ class ApplicationTests
 				.isTrue();
 	}
 
+	@Test
+	void unknownMessagesAndPoisonPillsForAllTypes()
+	{
+		send100Messages((key, counter) ->
+		{
+			switch((int) (counter%10))
+			{
+				case 1:
+					return serializeString("BOOM!", "message");
+				case 2:
+					return serializeString("BOOM!", "greeting");
+				case 3:
+					return serializeString("BOOM!", "foo");
+				case 4:
+					return serializeString("BOOM!", "bar");
+				case 5:
+					return serializeAsFooMessage(key);
+				case 6:
+				case 7:
+					return serializeAsGreeting(key);
+				default:
+					return serializeAsClientMessage(key, counter);
+			}
+		});
+
+		await("100 records received")
+				.atMost(Duration.ofSeconds(30))
+				.until(() -> received.size() == 50);
+
+		await("Offsets committed")
+				.atMost(Duration.ofSeconds(10))
+				.untilAsserted(() ->
+				{
+					// UNSCHÖN:
+					// Funktioniert nur, weil nach den Nachrichten, die einen
+					// Deserialisierungs-Fehler auslösen noch valide Nachrichten
+					// gelesen werden.
+					// GRUND:
+					// Der MessageHandler sieht die Offsets der Fehlerhaften
+					// Nachrichten nicht!
+					checkSeenOffsetsForProgress();
+					compareToCommitedOffsets(newOffsets);
+				});
+
+		assertThat(endlessConsumer.isRunning())
+				.describedAs("Consumer should still be running")
+				.isTrue();
+	}
+
 	@Test
 	void commitsCurrentOffsetsOnDeserializationError()
 	{
@@ -199,7 +247,7 @@ class ApplicationTests
 			Long expected = offsetsToCheck.get(tp) + 1;
 			log.debug("TEST: Comparing the expected offset of {} for {} to {}", expected, tp, offset);
 			assertThat(offset)
-					.describedAs("Committed offset corresponds to the offset of the consumer")
+					.describedAs("Committed offset corresponds to the offset of the consumer for " + tp)
 					.isEqualTo(expected);
 		});
 	}
@@ -302,6 +350,14 @@ class ApplicationTests
 		return new BytesAndType(serialize(greeting), "greeting");
 	}
 
+	BytesAndType serializeAsFooMessage(Integer key)
+	{
+		FooMessage foo = new FooMessage();
+		foo.setClient(key.toString());
+		foo.setTimestamp(System.currentTimeMillis());
+		return new BytesAndType(serialize(foo), "foo");
+	}
+
 	BytesAndType serializeString(String message, String messageType)
 	{
 		return new BytesAndType(new Bytes(message.getBytes()), messageType);