From: Kai Moritz <kai@juplo.de>
Date: Sat, 8 Jun 2024 09:01:22 +0000 (+0200)
Subject: counter: 1.2.15 - Added assertion for the expected final output messages
X-Git-Tag: counter-1.2.15~2
X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=237133719b1d06d542302fb948d9cf3aff80a8a4;p=demos%2Fkafka%2Fwordcount

counter: 1.2.15 - Added assertion for the expected final output messages
---

diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java
index a771904..d08930c 100644
--- a/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java
+++ b/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java
@@ -100,6 +100,15 @@ public class CounterApplicationIT
 				.untilAsserted(() -> consumer.enforceAssertion(TestData.expectedMessagesAssertion()));
 	}
 
+	@DisplayName("Await the expected final output messages")
+	@Test
+	public void testAwaitExpectedLastMessagesForUsers()
+	{
+		await("Expected final output messages")
+				.atMost(Duration.ofSeconds(5))
+				.untilAsserted(() -> consumer.enforceAssertion(TestData.expectedLastMessagesForWordAssertion()));
+	}
+
 
 	static class Consumer
 	{
diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java
index e761679..4b67052 100644
--- a/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java
+++ b/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java
@@ -74,6 +74,7 @@ public class CounterStreamProcessorTopologyTest
     TestData.assertExpectedMessages(receivedMessages);
 
     TestData.assertExpectedNumberOfMessagesForWord(receivedMessages);
+    TestData.assertExpectedLastMessagesForWord(receivedMessages);
   }
 
   @AfterEach
diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java b/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java
index c9d871a..b5c6c46 100644
--- a/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java
+++ b/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java
@@ -104,6 +104,42 @@ class TestData
 		return messagesForUsers.get(word).size();
 	}
 
+	static Consumer<MultiValueMap<TestOutputWord, TestOutputWordCounter>> expectedLastMessagesForWordAssertion()
+	{
+		return receivedMessages -> assertExpectedLastMessagesForWord(receivedMessages);
+	}
+
+	static void assertExpectedLastMessagesForWord(MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages)
+	{
+		assertWordCountEqualsWordCountFromLastMessage(PETER_HALLO, getLastMessageFor(PETER_HALLO, receivedMessages));
+		assertWordCountEqualsWordCountFromLastMessage(PETER_WELT, getLastMessageFor(PETER_WELT, receivedMessages));
+		assertWordCountEqualsWordCountFromLastMessage(PETER_BOÄH, getLastMessageFor(PETER_BOÄH, receivedMessages));
+		assertWordCountEqualsWordCountFromLastMessage(KLAUS_MÜSCH, getLastMessageFor(KLAUS_MÜSCH, receivedMessages));
+		assertWordCountEqualsWordCountFromLastMessage(KLAUS_S, getLastMessageFor(KLAUS_S, receivedMessages));
+	}
+
+	private static void assertWordCountEqualsWordCountFromLastMessage(
+			TestOutputWord word,
+			TestOutputWordCounter counter)
+	{
+		assertThat(counter).isEqualTo(getLastMessageFor(word));
+	}
+
+	private static TestOutputWordCounter getLastMessageFor(TestOutputWord word)
+	{
+		return getLastMessageFor(word, expectedMessages());
+	}
+
+	private static TestOutputWordCounter getLastMessageFor(
+			TestOutputWord user,
+			MultiValueMap<TestOutputWord, TestOutputWordCounter> messagesForWord)
+	{
+		return messagesForWord
+				.get(user)
+				.stream()
+				.reduce(null, (left, right) -> right);
+	}
+
 	private static final KeyValue<TestOutputWord, TestOutputWordCounter>[] EXPECTED_MESSAGES = new KeyValue[]
 	{
 			KeyValue.pair(