From: Kai Moritz <kai@juplo.de>
Date: Sat, 8 Jun 2024 09:33:53 +0000 (+0200)
Subject: counter: 1.2.15 - Added assertion for the expected state
X-Git-Tag: counter-1.2.15~1
X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=3eb1ec997478982fffb31f09c21e756b529e474a;p=demos%2Fkafka%2Fwordcount

counter: 1.2.15 - Added assertion for the expected state
---

diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java
index 738bc98..6872d5d 100644
--- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java
+++ b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java
@@ -16,6 +16,7 @@ import org.springframework.kafka.support.serializer.JsonSerde;
 import java.util.Properties;
 import java.util.concurrent.CompletableFuture;
 
+import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME;
 import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
 
 
@@ -90,6 +91,6 @@ public class CounterApplicationConfiguriation
 	@Bean
 	public KeyValueBytesStoreSupplier storeSupplier()
 	{
-		return Stores.persistentKeyValueStore("counter");
+		return Stores.persistentKeyValueStore(STORE_NAME);
 	}
 }
diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java
index b1343a7..712ab65 100644
--- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java
+++ b/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java
@@ -2,14 +2,13 @@ package de.juplo.kafka.wordcount.counter;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.*;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 
 import java.util.Properties;
 
@@ -17,6 +16,9 @@ import java.util.Properties;
 @Slf4j
 public class CounterStreamProcessor
 {
+	public static final String STORE_NAME = "counter";
+
+
 	public final KafkaStreams streams;
 
 
@@ -59,6 +61,11 @@ public class CounterStreamProcessor
 		return topology;
 	}
 
+	ReadOnlyKeyValueStore<Word, Long> getStore()
+	{
+		return streams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore()));
+	}
+
 	public void start()
 	{
 		log.info("Starting Stream-Processor");
diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java
index d08930c..a53ffc8 100644
--- a/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java
+++ b/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java
@@ -28,6 +28,7 @@ import java.time.Duration;
 
 import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.TOPIC_IN;
 import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.TOPIC_OUT;
+import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME;
 import static org.awaitility.Awaitility.await;
 
 
@@ -55,6 +56,8 @@ public class CounterApplicationIT
 
 	@Autowired
 	Consumer consumer;
+	@Autowired
+	CounterStreamProcessor streamProcessor;
 
 
 	@BeforeAll
@@ -109,6 +112,15 @@ public class CounterApplicationIT
 				.untilAsserted(() -> consumer.enforceAssertion(TestData.expectedLastMessagesForWordAssertion()));
 	}
 
+	@DisplayName("Await the expected state in the state-store")
+	@Test
+	public void testAwaitExpectedState()
+	{
+		await("Expected state")
+				.atMost(Duration.ofSeconds(5))
+				.untilAsserted(() -> TestData.assertExpectedState(streamProcessor.getStore()));
+	}
+
 
 	static class Consumer
 	{
@@ -143,7 +155,7 @@ public class CounterApplicationIT
 		@Bean
 		KeyValueBytesStoreSupplier inMemoryStoreSupplier()
 		{
-			return Stores.inMemoryKeyValueStore("TEST-STORE");
+			return Stores.inMemoryKeyValueStore(STORE_NAME);
 		}
 	}
 }
diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java
index 4b67052..6e244e2 100644
--- a/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java
+++ b/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java
@@ -9,6 +9,7 @@ import org.apache.kafka.streams.TestInputTopic;
 import org.apache.kafka.streams.TestOutputTopic;
 import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.Stores;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -24,8 +25,9 @@ import static de.juplo.kafka.wordcount.counter.CounterApplicationConfiguriation.
 @Slf4j
 public class CounterStreamProcessorTopologyTest
 {
-  public final static String IN = "TEST-IN";
-  public final static String OUT = "TEST-OUT";
+  public static final String IN = "TEST-IN";
+  public static final String OUT = "TEST-OUT";
+  public static final String STORE_NAME = "TOPOLOGY-TEST";
 
 
   TopologyTestDriver testDriver;
@@ -39,7 +41,7 @@ public class CounterStreamProcessorTopologyTest
     Topology topology = CounterStreamProcessor.buildTopology(
         IN,
         OUT,
-        Stores.inMemoryKeyValueStore("TOPOLOGY-TEST"));
+        Stores.inMemoryKeyValueStore(STORE_NAME));
 
     testDriver = new TopologyTestDriver(topology, serializationConfig());
 
@@ -75,6 +77,9 @@ public class CounterStreamProcessorTopologyTest
 
     TestData.assertExpectedNumberOfMessagesForWord(receivedMessages);
     TestData.assertExpectedLastMessagesForWord(receivedMessages);
+
+    KeyValueStore<Word, Long> store = testDriver.getKeyValueStore(STORE_NAME);
+    TestData.assertExpectedState(store);
   }
 
   @AfterEach
diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java b/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java
index b5c6c46..4e431e1 100644
--- a/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java
+++ b/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java
@@ -4,6 +4,7 @@ import de.juplo.kafka.wordcount.splitter.TestInputWord;
 import de.juplo.kafka.wordcount.top10.TestOutputWord;
 import de.juplo.kafka.wordcount.top10.TestOutputWordCounter;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 import org.springframework.util.LinkedMultiValueMap;
 import org.springframework.util.MultiValueMap;
 
@@ -109,6 +110,25 @@ class TestData
 		return receivedMessages -> assertExpectedLastMessagesForWord(receivedMessages);
 	}
 
+	static void assertExpectedState(ReadOnlyKeyValueStore<Word, Long> store)
+	{
+		assertWordCountEqualsWordCountFromLastMessage(PETER_HALLO, store.get(wordOf(PETER_HALLO)));
+		assertWordCountEqualsWordCountFromLastMessage(PETER_WELT, store.get(wordOf(PETER_WELT)));
+		assertWordCountEqualsWordCountFromLastMessage(PETER_BOÄH, store.get(wordOf(PETER_BOÄH)));
+		assertWordCountEqualsWordCountFromLastMessage(KLAUS_MÜSCH, store.get(wordOf(KLAUS_MÜSCH)));
+		assertWordCountEqualsWordCountFromLastMessage(KLAUS_S, store.get(wordOf(KLAUS_S)));
+	}
+
+	private static Word wordOf(TestOutputWord testOutputWord)
+	{
+		Word word = new Word();
+
+		word.setUser(testOutputWord.getUser());
+		word.setWord(testOutputWord.getWord());
+
+		return word;
+	}
+
 	static void assertExpectedLastMessagesForWord(MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages)
 	{
 		assertWordCountEqualsWordCountFromLastMessage(PETER_HALLO, getLastMessageFor(PETER_HALLO, receivedMessages));
@@ -118,6 +138,17 @@ class TestData
 		assertWordCountEqualsWordCountFromLastMessage(KLAUS_S, getLastMessageFor(KLAUS_S, receivedMessages));
 	}
 
+	private static void assertWordCountEqualsWordCountFromLastMessage(
+			TestOutputWord word,
+			Long counter)
+	{
+		TestOutputWordCounter testOutputWordCounter = TestOutputWordCounter.of(
+				word.getUser(),
+				word.getWord(),
+				counter);
+		assertWordCountEqualsWordCountFromLastMessage(word, testOutputWordCounter);
+	}
+
 	private static void assertWordCountEqualsWordCountFromLastMessage(
 			TestOutputWord word,
 			TestOutputWordCounter counter)