From ae04ac51dae8d4d8f7d3434e7ddf7a8b40bd5eac Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 8 Jun 2024 09:36:43 +0200 Subject: [PATCH 01/16] counter: 1.2.15 - Refined `CounterApplicationIT` * Preparations for the addition of new tests * The messages are send only once during `@BeforeAll` --- .../counter/CounterApplicationIT.java | 37 +++++++++++++------ 1 file changed, 26 insertions(+), 11 deletions(-) diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java index cb0a5b7..41e5b62 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java @@ -6,7 +6,8 @@ import de.juplo.kafka.wordcount.top10.TestOutputWordCounter; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.Stores; -import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @@ -16,6 +17,7 @@ import org.springframework.context.annotation.Primary; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.kafka.support.SendResult; import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.messaging.handler.annotation.Header; import org.springframework.messaging.handler.annotation.Payload; @@ -51,26 +53,39 @@ public class CounterApplicationIT public static final String TOPIC_IN = "in"; public static final String TOPIC_OUT = "out"; - @Autowired - KafkaTemplate kafkaTemplate; @Autowired Consumer consumer; - @BeforeEach - public void clear() + @BeforeAll + public static void testSendMessage( + @Autowired KafkaTemplate kafkaTemplate) { - consumer.received.clear(); + TestData + .getInputMessages() + .forEach(kv -> + { + try + { + SendResult result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get(); + log.info( + "Sent: {}={}, partition={}, offset={}", + result.getProducerRecord().key(), + result.getProducerRecord().value(), + result.getRecordMetadata().partition(), + result.getRecordMetadata().offset()); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + }); } - + @DisplayName("Await the expected output messages") @Test void testSendMessage() { - TestData - .getInputMessages() - .forEach(kv -> kafkaTemplate.send(TOPIC_IN, kv.key, kv.value)); - await("Expected messages") .atMost(Duration.ofSeconds(10)) .untilAsserted(() -> consumer.enforceAssertion(TestData.expectedMessagesAssertion())); -- 2.20.1 From 1127988a1b631aa9e0c0107c1a3ed9f99edd188b Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 8 Jun 2024 10:35:34 +0200 Subject: [PATCH 02/16] counter: 1.2.15 - Added assertion for the expected number of messages --- .../counter/CounterApplicationIT.java | 9 +++++++++ .../CounterStreamProcessorTopologyTest.java | 2 ++ .../kafka/wordcount/counter/TestData.java | 19 +++++++++++++++++++ 3 files changed, 30 insertions(+) diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java index 41e5b62..a771904 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java @@ -82,6 +82,15 @@ public class CounterApplicationIT }); } + @DisplayName("Await the expected number of messages") + @Test + public void testAwaitExpectedNumberOfMessagesForUsers() + { + await("Expected number of messages") + .atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> consumer.enforceAssertion(TestData.expectedNumberOfMessagesForWordAssertion())); + } + @DisplayName("Await the expected output messages") @Test void testSendMessage() diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java index a1b4c7f..e761679 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java @@ -72,6 +72,8 @@ public class CounterStreamProcessorTopologyTest .forEach(record -> receivedMessages.add(record.key(), record.value())); TestData.assertExpectedMessages(receivedMessages); + + TestData.assertExpectedNumberOfMessagesForWord(receivedMessages); } @AfterEach diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java b/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java index 714e585..c9d871a 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java @@ -85,6 +85,25 @@ class TestData .containsExactlyElementsOf(counter)); } + static Consumer> expectedNumberOfMessagesForWordAssertion() + { + return receivedMessages -> assertExpectedNumberOfMessagesForWord(receivedMessages); + } + + static void assertExpectedNumberOfMessagesForWord(MultiValueMap receivedMessages) + { + assertThat(countMessagesForWord(PETER_HALLO, receivedMessages)); + assertThat(countMessagesForWord(PETER_WELT, receivedMessages)); + assertThat(countMessagesForWord(PETER_BOÄH, receivedMessages)); + assertThat(countMessagesForWord(KLAUS_MÜSCH, receivedMessages)); + assertThat(countMessagesForWord(KLAUS_S, receivedMessages)); + } + + private static int countMessagesForWord(TestOutputWord word, MultiValueMap messagesForUsers) + { + return messagesForUsers.get(word).size(); + } + private static final KeyValue[] EXPECTED_MESSAGES = new KeyValue[] { KeyValue.pair( -- 2.20.1 From 237133719b1d06d542302fb948d9cf3aff80a8a4 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 8 Jun 2024 11:01:22 +0200 Subject: [PATCH 03/16] counter: 1.2.15 - Added assertion for the expected final output messages --- .../counter/CounterApplicationIT.java | 9 +++++ .../CounterStreamProcessorTopologyTest.java | 1 + .../kafka/wordcount/counter/TestData.java | 36 +++++++++++++++++++ 3 files changed, 46 insertions(+) diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java index a771904..d08930c 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java @@ -100,6 +100,15 @@ public class CounterApplicationIT .untilAsserted(() -> consumer.enforceAssertion(TestData.expectedMessagesAssertion())); } + @DisplayName("Await the expected final output messages") + @Test + public void testAwaitExpectedLastMessagesForUsers() + { + await("Expected final output messages") + .atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> consumer.enforceAssertion(TestData.expectedLastMessagesForWordAssertion())); + } + static class Consumer { diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java index e761679..4b67052 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java @@ -74,6 +74,7 @@ public class CounterStreamProcessorTopologyTest TestData.assertExpectedMessages(receivedMessages); TestData.assertExpectedNumberOfMessagesForWord(receivedMessages); + TestData.assertExpectedLastMessagesForWord(receivedMessages); } @AfterEach diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java b/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java index c9d871a..b5c6c46 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java @@ -104,6 +104,42 @@ class TestData return messagesForUsers.get(word).size(); } + static Consumer> expectedLastMessagesForWordAssertion() + { + return receivedMessages -> assertExpectedLastMessagesForWord(receivedMessages); + } + + static void assertExpectedLastMessagesForWord(MultiValueMap receivedMessages) + { + assertWordCountEqualsWordCountFromLastMessage(PETER_HALLO, getLastMessageFor(PETER_HALLO, receivedMessages)); + assertWordCountEqualsWordCountFromLastMessage(PETER_WELT, getLastMessageFor(PETER_WELT, receivedMessages)); + assertWordCountEqualsWordCountFromLastMessage(PETER_BOÄH, getLastMessageFor(PETER_BOÄH, receivedMessages)); + assertWordCountEqualsWordCountFromLastMessage(KLAUS_MÜSCH, getLastMessageFor(KLAUS_MÜSCH, receivedMessages)); + assertWordCountEqualsWordCountFromLastMessage(KLAUS_S, getLastMessageFor(KLAUS_S, receivedMessages)); + } + + private static void assertWordCountEqualsWordCountFromLastMessage( + TestOutputWord word, + TestOutputWordCounter counter) + { + assertThat(counter).isEqualTo(getLastMessageFor(word)); + } + + private static TestOutputWordCounter getLastMessageFor(TestOutputWord word) + { + return getLastMessageFor(word, expectedMessages()); + } + + private static TestOutputWordCounter getLastMessageFor( + TestOutputWord user, + MultiValueMap messagesForWord) + { + return messagesForWord + .get(user) + .stream() + .reduce(null, (left, right) -> right); + } + private static final KeyValue[] EXPECTED_MESSAGES = new KeyValue[] { KeyValue.pair( -- 2.20.1 From 3eb1ec997478982fffb31f09c21e756b529e474a Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 8 Jun 2024 11:33:53 +0200 Subject: [PATCH 04/16] counter: 1.2.15 - Added assertion for the expected state --- .../CounterApplicationConfiguriation.java | 3 +- .../counter/CounterStreamProcessor.java | 15 ++++++--- .../counter/CounterApplicationIT.java | 14 ++++++++- .../CounterStreamProcessorTopologyTest.java | 11 +++++-- .../kafka/wordcount/counter/TestData.java | 31 +++++++++++++++++++ 5 files changed, 65 insertions(+), 9 deletions(-) diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java index 738bc98..6872d5d 100644 --- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java +++ b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java @@ -16,6 +16,7 @@ import org.springframework.kafka.support.serializer.JsonSerde; import java.util.Properties; import java.util.concurrent.CompletableFuture; +import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME; import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; @@ -90,6 +91,6 @@ public class CounterApplicationConfiguriation @Bean public KeyValueBytesStoreSupplier storeSupplier() { - return Stores.persistentKeyValueStore("counter"); + return Stores.persistentKeyValueStore(STORE_NAME); } } diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java index b1343a7..712ab65 100644 --- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java @@ -2,14 +2,13 @@ package de.juplo.kafka.wordcount.counter; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.*; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.QueryableStoreTypes; +import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; import java.util.Properties; @@ -17,6 +16,9 @@ import java.util.Properties; @Slf4j public class CounterStreamProcessor { + public static final String STORE_NAME = "counter"; + + public final KafkaStreams streams; @@ -59,6 +61,11 @@ public class CounterStreamProcessor return topology; } + ReadOnlyKeyValueStore getStore() + { + return streams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore())); + } + public void start() { log.info("Starting Stream-Processor"); diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java index d08930c..a53ffc8 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java @@ -28,6 +28,7 @@ import java.time.Duration; import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.TOPIC_IN; import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.TOPIC_OUT; +import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME; import static org.awaitility.Awaitility.await; @@ -55,6 +56,8 @@ public class CounterApplicationIT @Autowired Consumer consumer; + @Autowired + CounterStreamProcessor streamProcessor; @BeforeAll @@ -109,6 +112,15 @@ public class CounterApplicationIT .untilAsserted(() -> consumer.enforceAssertion(TestData.expectedLastMessagesForWordAssertion())); } + @DisplayName("Await the expected state in the state-store") + @Test + public void testAwaitExpectedState() + { + await("Expected state") + .atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> TestData.assertExpectedState(streamProcessor.getStore())); + } + static class Consumer { @@ -143,7 +155,7 @@ public class CounterApplicationIT @Bean KeyValueBytesStoreSupplier inMemoryStoreSupplier() { - return Stores.inMemoryKeyValueStore("TEST-STORE"); + return Stores.inMemoryKeyValueStore(STORE_NAME); } } } diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java index 4b67052..6e244e2 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java @@ -9,6 +9,7 @@ import org.apache.kafka.streams.TestInputTopic; import org.apache.kafka.streams.TestOutputTopic; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.Stores; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -24,8 +25,9 @@ import static de.juplo.kafka.wordcount.counter.CounterApplicationConfiguriation. @Slf4j public class CounterStreamProcessorTopologyTest { - public final static String IN = "TEST-IN"; - public final static String OUT = "TEST-OUT"; + public static final String IN = "TEST-IN"; + public static final String OUT = "TEST-OUT"; + public static final String STORE_NAME = "TOPOLOGY-TEST"; TopologyTestDriver testDriver; @@ -39,7 +41,7 @@ public class CounterStreamProcessorTopologyTest Topology topology = CounterStreamProcessor.buildTopology( IN, OUT, - Stores.inMemoryKeyValueStore("TOPOLOGY-TEST")); + Stores.inMemoryKeyValueStore(STORE_NAME)); testDriver = new TopologyTestDriver(topology, serializationConfig()); @@ -75,6 +77,9 @@ public class CounterStreamProcessorTopologyTest TestData.assertExpectedNumberOfMessagesForWord(receivedMessages); TestData.assertExpectedLastMessagesForWord(receivedMessages); + + KeyValueStore store = testDriver.getKeyValueStore(STORE_NAME); + TestData.assertExpectedState(store); } @AfterEach diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java b/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java index b5c6c46..4e431e1 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java @@ -4,6 +4,7 @@ import de.juplo.kafka.wordcount.splitter.TestInputWord; import de.juplo.kafka.wordcount.top10.TestOutputWord; import de.juplo.kafka.wordcount.top10.TestOutputWordCounter; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; @@ -109,6 +110,25 @@ class TestData return receivedMessages -> assertExpectedLastMessagesForWord(receivedMessages); } + static void assertExpectedState(ReadOnlyKeyValueStore store) + { + assertWordCountEqualsWordCountFromLastMessage(PETER_HALLO, store.get(wordOf(PETER_HALLO))); + assertWordCountEqualsWordCountFromLastMessage(PETER_WELT, store.get(wordOf(PETER_WELT))); + assertWordCountEqualsWordCountFromLastMessage(PETER_BOÄH, store.get(wordOf(PETER_BOÄH))); + assertWordCountEqualsWordCountFromLastMessage(KLAUS_MÜSCH, store.get(wordOf(KLAUS_MÜSCH))); + assertWordCountEqualsWordCountFromLastMessage(KLAUS_S, store.get(wordOf(KLAUS_S))); + } + + private static Word wordOf(TestOutputWord testOutputWord) + { + Word word = new Word(); + + word.setUser(testOutputWord.getUser()); + word.setWord(testOutputWord.getWord()); + + return word; + } + static void assertExpectedLastMessagesForWord(MultiValueMap receivedMessages) { assertWordCountEqualsWordCountFromLastMessage(PETER_HALLO, getLastMessageFor(PETER_HALLO, receivedMessages)); @@ -118,6 +138,17 @@ class TestData assertWordCountEqualsWordCountFromLastMessage(KLAUS_S, getLastMessageFor(KLAUS_S, receivedMessages)); } + private static void assertWordCountEqualsWordCountFromLastMessage( + TestOutputWord word, + Long counter) + { + TestOutputWordCounter testOutputWordCounter = TestOutputWordCounter.of( + word.getUser(), + word.getWord(), + counter); + assertWordCountEqualsWordCountFromLastMessage(word, testOutputWordCounter); + } + private static void assertWordCountEqualsWordCountFromLastMessage( TestOutputWord word, TestOutputWordCounter counter) -- 2.20.1 From 8d0426539d69616900f4d6ef19e52d50b497f57f Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 8 Jun 2024 12:26:10 +0200 Subject: [PATCH 05/16] counter: 1.2.15 - Inlined the asserting ``Consumer``s --- .../wordcount/counter/CounterApplicationIT.java | 9 ++++++--- .../juplo/kafka/wordcount/counter/TestData.java | 16 ---------------- 2 files changed, 6 insertions(+), 19 deletions(-) diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java index a53ffc8..9995ce7 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java @@ -91,7 +91,8 @@ public class CounterApplicationIT { await("Expected number of messages") .atMost(Duration.ofSeconds(5)) - .untilAsserted(() -> consumer.enforceAssertion(TestData.expectedNumberOfMessagesForWordAssertion())); + .untilAsserted(() -> consumer.enforceAssertion( + receivedMessages -> TestData.assertExpectedNumberOfMessagesForWord(receivedMessages))); } @DisplayName("Await the expected output messages") @@ -100,7 +101,8 @@ public class CounterApplicationIT { await("Expected messages") .atMost(Duration.ofSeconds(10)) - .untilAsserted(() -> consumer.enforceAssertion(TestData.expectedMessagesAssertion())); + .untilAsserted(() -> consumer.enforceAssertion( + receivedMessages -> TestData.assertExpectedMessages(receivedMessages))); } @DisplayName("Await the expected final output messages") @@ -109,7 +111,8 @@ public class CounterApplicationIT { await("Expected final output messages") .atMost(Duration.ofSeconds(5)) - .untilAsserted(() -> consumer.enforceAssertion(TestData.expectedLastMessagesForWordAssertion())); + .untilAsserted(() -> consumer.enforceAssertion( + receivedMessages -> TestData.assertExpectedLastMessagesForWord(receivedMessages))); } @DisplayName("Await the expected state in the state-store") diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java b/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java index 4e431e1..7446db6 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java @@ -8,7 +8,6 @@ import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; -import java.util.function.Consumer; import java.util.stream.Stream; import static org.assertj.core.api.Assertions.assertThat; @@ -73,11 +72,6 @@ class TestData return Stream.of(TestData.INPUT_MESSAGES); } - static Consumer> expectedMessagesAssertion() - { - return receivedMessages -> assertExpectedMessages(receivedMessages); - } - static void assertExpectedMessages(MultiValueMap receivedMessages) { expectedMessages().forEach( @@ -86,11 +80,6 @@ class TestData .containsExactlyElementsOf(counter)); } - static Consumer> expectedNumberOfMessagesForWordAssertion() - { - return receivedMessages -> assertExpectedNumberOfMessagesForWord(receivedMessages); - } - static void assertExpectedNumberOfMessagesForWord(MultiValueMap receivedMessages) { assertThat(countMessagesForWord(PETER_HALLO, receivedMessages)); @@ -105,11 +94,6 @@ class TestData return messagesForUsers.get(word).size(); } - static Consumer> expectedLastMessagesForWordAssertion() - { - return receivedMessages -> assertExpectedLastMessagesForWord(receivedMessages); - } - static void assertExpectedState(ReadOnlyKeyValueStore store) { assertWordCountEqualsWordCountFromLastMessage(PETER_HALLO, store.get(wordOf(PETER_HALLO))); -- 2.20.1 From e6198710fe679bc7463d1b17c9d9dc311062ef31 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Wed, 5 Jun 2024 21:30:17 +0200 Subject: [PATCH 06/16] counter: 1.3.0 - (RED) Introduced domain-class `User` as key * _GREEN:_ The `CounterApplicationIT` does _not_ reveal the bug! * _RED:_ The `CounterStreamProcessorToplogyTest` fails with an exception, that gives a hint for the cause of the bug. * The bug is caused by missing type-specifications for the operation ``cout()``. * Before the introduction of the domain-class `User` everything worked as expected, because the class `Word` could be specified as default for the deserialization of the key. ** With the introduction of the domain-class `User` as key of the incoming messages, the default for the key has to switched to this class, to enable the application to deserialize incomming keys despite the missing type mapping. ** Beforehand, the default `Word` covered the missing type information for the ``count()``-operator. --- pom.xml | 2 +- .../CounterApplicationConfiguriation.java | 3 ++- .../counter/CounterStreamProcessor.java | 6 +---- .../juplo/kafka/wordcount/counter/User.java | 12 +++++++++ .../counter/CounterApplicationIT.java | 6 +++-- .../CounterStreamProcessorTopologyTest.java | 6 ++--- .../kafka/wordcount/counter/TestData.java | 27 ++++++++++--------- .../wordcount/splitter/TestInputUser.java | 14 ++++++++++ 8 files changed, 51 insertions(+), 25 deletions(-) create mode 100644 src/main/java/de/juplo/kafka/wordcount/counter/User.java create mode 100644 src/test/java/de/juplo/kafka/wordcount/splitter/TestInputUser.java diff --git a/pom.xml b/pom.xml index 3adeb56..5859736 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ de.juplo.kafka.wordcount counter - 1.2.15 + 1.3.0 Wordcount-Counter Word-counting stream-processor of the multi-user wordcount-example diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java index 6872d5d..484b8de 100644 --- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java +++ b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java @@ -50,10 +50,11 @@ public class CounterApplicationConfiguriation propertyMap.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); propertyMap.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); - propertyMap.put(JsonDeserializer.KEY_DEFAULT_TYPE, Word.class.getName()); + propertyMap.put(JsonDeserializer.KEY_DEFAULT_TYPE, User.class.getName()); propertyMap.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Word.class.getName()); propertyMap.put( JsonDeserializer.TYPE_MAPPINGS, + "user:" + User.class.getName() + "," + "word:" + Word.class.getName() + "," + "counter:" + WordCounter.class.getName()); diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java index 712ab65..fd5c5a7 100644 --- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java @@ -1,9 +1,7 @@ package de.juplo.kafka.wordcount.counter; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.*; -import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; @@ -43,9 +41,7 @@ public class CounterStreamProcessor { StreamsBuilder builder = new StreamsBuilder(); - KStream source = builder.stream( - inputTopic, - Consumed.with(Serdes.String(), null)); + KStream source = builder.stream(inputTopic); source .map((key, word) -> new KeyValue<>(word, word)) diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/User.java b/src/main/java/de/juplo/kafka/wordcount/counter/User.java new file mode 100644 index 0000000..e38bcba --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/counter/User.java @@ -0,0 +1,12 @@ +package de.juplo.kafka.wordcount.counter; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.Data; + + +@Data +@JsonIgnoreProperties(ignoreUnknown = true) +public class User +{ + String user; +} diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java index 9995ce7..1bfceed 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java @@ -1,5 +1,6 @@ package de.juplo.kafka.wordcount.counter; +import de.juplo.kafka.wordcount.splitter.TestInputUser; import de.juplo.kafka.wordcount.splitter.TestInputWord; import de.juplo.kafka.wordcount.top10.TestOutputWord; import de.juplo.kafka.wordcount.top10.TestOutputWordCounter; @@ -34,6 +35,7 @@ import static org.awaitility.Awaitility.await; @SpringBootTest( properties = { + "spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer", "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer", "spring.kafka.producer.properties.spring.json.add.type.headers=false", "spring.kafka.consumer.auto-offset-reset=earliest", @@ -62,7 +64,7 @@ public class CounterApplicationIT @BeforeAll public static void testSendMessage( - @Autowired KafkaTemplate kafkaTemplate) + @Autowired KafkaTemplate kafkaTemplate) { TestData .getInputMessages() @@ -70,7 +72,7 @@ public class CounterApplicationIT { try { - SendResult result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get(); + SendResult result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get(); log.info( "Sent: {}={}, partition={}, offset={}", result.getProducerRecord().key(), diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java index 6e244e2..0ffd516 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java @@ -1,10 +1,10 @@ package de.juplo.kafka.wordcount.counter; +import de.juplo.kafka.wordcount.splitter.TestInputUser; import de.juplo.kafka.wordcount.splitter.TestInputWord; import de.juplo.kafka.wordcount.top10.TestOutputWord; import de.juplo.kafka.wordcount.top10.TestOutputWordCounter; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.TestInputTopic; import org.apache.kafka.streams.TestOutputTopic; import org.apache.kafka.streams.Topology; @@ -31,7 +31,7 @@ public class CounterStreamProcessorTopologyTest TopologyTestDriver testDriver; - TestInputTopic in; + TestInputTopic in; TestOutputTopic out; @@ -47,7 +47,7 @@ public class CounterStreamProcessorTopologyTest in = testDriver.createInputTopic( IN, - new StringSerializer(), + new JsonSerializer().noTypeInfo(), new JsonSerializer().noTypeInfo()); out = testDriver.createOutputTopic( diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java b/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java index 7446db6..54e6287 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java @@ -1,5 +1,6 @@ package de.juplo.kafka.wordcount.counter; +import de.juplo.kafka.wordcount.splitter.TestInputUser; import de.juplo.kafka.wordcount.splitter.TestInputWord; import de.juplo.kafka.wordcount.top10.TestOutputWord; import de.juplo.kafka.wordcount.top10.TestOutputWordCounter; @@ -30,44 +31,44 @@ class TestData static final TestOutputWord KLAUS_MÜSCH = TestOutputWord.of(KLAUS, WORD_MÜSCH); static final TestOutputWord KLAUS_S = TestOutputWord.of(KLAUS, WORD_S); - private static final KeyValue[] INPUT_MESSAGES = new KeyValue[] + private static final KeyValue[] INPUT_MESSAGES = new KeyValue[] { new KeyValue<>( - PETER, + TestInputUser.of(PETER), TestInputWord.of(PETER, WORD_HALLO)), new KeyValue<>( - KLAUS, + TestInputUser.of(KLAUS), TestInputWord.of(KLAUS, WORD_MÜSCH)), new KeyValue<>( - PETER, + TestInputUser.of(PETER), TestInputWord.of(PETER, WORD_WELT)), new KeyValue<>( - KLAUS, + TestInputUser.of(KLAUS), TestInputWord.of(KLAUS, WORD_MÜSCH)), new KeyValue<>( - KLAUS, + TestInputUser.of(KLAUS), TestInputWord.of(KLAUS, WORD_S)), new KeyValue<>( - PETER, + TestInputUser.of(PETER), TestInputWord.of(PETER, WORD_BOÄH)), new KeyValue<>( - PETER, + TestInputUser.of(PETER), TestInputWord.of(PETER, WORD_WELT)), new KeyValue<>( - PETER, + TestInputUser.of(PETER), TestInputWord.of(PETER, WORD_BOÄH)), new KeyValue<>( - KLAUS, + TestInputUser.of(KLAUS), TestInputWord.of(KLAUS, WORD_S)), new KeyValue<>( - PETER, + TestInputUser.of(PETER), TestInputWord.of(PETER, WORD_BOÄH)), new KeyValue<>( - KLAUS, + TestInputUser.of(KLAUS), TestInputWord.of(KLAUS, WORD_S)), }; - static Stream> getInputMessages() + static Stream> getInputMessages() { return Stream.of(TestData.INPUT_MESSAGES); } diff --git a/src/test/java/de/juplo/kafka/wordcount/splitter/TestInputUser.java b/src/test/java/de/juplo/kafka/wordcount/splitter/TestInputUser.java new file mode 100644 index 0000000..2255b61 --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/splitter/TestInputUser.java @@ -0,0 +1,14 @@ +package de.juplo.kafka.wordcount.splitter; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + + +@Data +@NoArgsConstructor +@AllArgsConstructor(staticName = "of") +public class TestInputUser +{ + String user; +} -- 2.20.1 From 19226e74f738681af8f4d6a574cf9e69eae1da90 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Wed, 5 Jun 2024 21:44:26 +0200 Subject: [PATCH 07/16] counter: 1.3.0 - (RED) Made `CounterApplicationIT` fail too * Validate with: `mvn test -Dtest=CounterApplicationI` * Turning on the caching forces the application, to serialize and deserialize _every_ message, hence, revealing the bug, that was only detected by the `CounterStreamProcessorToplogyTest` before, that always behaves like this. --- .../de/juplo/kafka/wordcount/counter/CounterApplicationIT.java | 1 - 1 file changed, 1 deletion(-) diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java index 1bfceed..334cd05 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java @@ -46,7 +46,6 @@ import static org.awaitility.Awaitility.await; "logging.level.de.juplo=DEBUG", "juplo.wordcount.counter.bootstrap-server=${spring.embedded.kafka.brokers}", "juplo.wordcount.counter.commit-interval=0", - "juplo.wordcount.counter.cacheMaxBytes=0", "juplo.wordcount.counter.input-topic=" + TOPIC_IN, "juplo.wordcount.counter.output-topic=" + TOPIC_OUT }) @EmbeddedKafka(topics = { TOPIC_IN, TOPIC_OUT }) -- 2.20.1 From 98f4de98c6c3f527496d85b5e32fef0a3a23cbd5 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Wed, 5 Jun 2024 23:57:32 +0200 Subject: [PATCH 08/16] counter: 1.3.0 - (GREEN) Fixed the typing for the state-store --- .../kafka/wordcount/counter/CounterStreamProcessor.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java index fd5c5a7..64bd619 100644 --- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java @@ -7,6 +7,7 @@ import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; +import org.springframework.kafka.support.serializer.JsonSerde; import java.util.Properties; @@ -46,7 +47,10 @@ public class CounterStreamProcessor source .map((key, word) -> new KeyValue<>(word, word)) .groupByKey() - .count(Materialized.as(storeSupplier)) + .count( + Materialized + .as(storeSupplier) + .withKeySerde(new JsonSerde<>().copyWithType(Word.class).forKeys())) .toStream() .map((word, counter) -> new KeyValue<>(word, WordCounter.of(word, counter))) .to(outputTopic); -- 2.20.1 From 5aa03935c0d09c363dc2b3ddcbd5fc9aac93b8e1 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 8 Jun 2024 19:39:00 +0200 Subject: [PATCH 09/16] counter: 1.3.0 - Fixed possible NPE in `Counter10ApplicationIT` --- src/test/java/de/juplo/kafka/wordcount/counter/TestData.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java b/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java index 54e6287..1ecfdbd 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java @@ -92,7 +92,9 @@ class TestData private static int countMessagesForWord(TestOutputWord word, MultiValueMap messagesForUsers) { - return messagesForUsers.get(word).size(); + return messagesForUsers.get(word) == null + ? 0 + : messagesForUsers.get(word).size(); } static void assertExpectedState(ReadOnlyKeyValueStore store) -- 2.20.1 From d8173052504e89f85b09ce060302e87979973714 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 16 Jun 2024 18:26:47 +0200 Subject: [PATCH 10/16] counter: 1.3.1 - Cleand code/setup for tests --- pom.xml | 2 +- .../counter/CounterApplicationIT.java | 8 +++---- .../CounterStreamProcessorTopologyTest.java | 10 +++------ .../kafka/wordcount/counter/TestData.java | 22 +++++++++---------- 4 files changed, 19 insertions(+), 23 deletions(-) diff --git a/pom.xml b/pom.xml index 5859736..03a7b40 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ de.juplo.kafka.wordcount counter - 1.3.0 + 1.3.1 Wordcount-Counter Word-counting stream-processor of the multi-user wordcount-example diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java index 334cd05..0faa2de 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java @@ -14,7 +14,6 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.TestConfiguration; import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Primary; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.KafkaHeaders; @@ -35,6 +34,7 @@ import static org.awaitility.Awaitility.await; @SpringBootTest( properties = { + "spring.main.allow-bean-definition-overriding=true", "spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer", "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer", "spring.kafka.producer.properties.spring.json.add.type.headers=false", @@ -45,7 +45,8 @@ import static org.awaitility.Awaitility.await; "logging.level.root=WARN", "logging.level.de.juplo=DEBUG", "juplo.wordcount.counter.bootstrap-server=${spring.embedded.kafka.brokers}", - "juplo.wordcount.counter.commit-interval=0", + "juplo.wordcount.counter.commit-interval=100", + "juplo.wordcount.counter.cache-max-bytes=0", "juplo.wordcount.counter.input-topic=" + TOPIC_IN, "juplo.wordcount.counter.output-topic=" + TOPIC_OUT }) @EmbeddedKafka(topics = { TOPIC_IN, TOPIC_OUT }) @@ -155,9 +156,8 @@ public class CounterApplicationIT return new Consumer(); } - @Primary @Bean - KeyValueBytesStoreSupplier inMemoryStoreSupplier() + KeyValueBytesStoreSupplier storeSupplier() { return Stores.inMemoryKeyValueStore(STORE_NAME); } diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java index 0ffd516..9c86c6c 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java @@ -20,6 +20,7 @@ import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; import static de.juplo.kafka.wordcount.counter.CounterApplicationConfiguriation.serializationConfig; +import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME; @Slf4j @@ -27,7 +28,6 @@ public class CounterStreamProcessorTopologyTest { public static final String IN = "TEST-IN"; public static final String OUT = "TEST-OUT"; - public static final String STORE_NAME = "TOPOLOGY-TEST"; TopologyTestDriver testDriver; @@ -52,12 +52,8 @@ public class CounterStreamProcessorTopologyTest out = testDriver.createOutputTopic( OUT, - new JsonDeserializer() - .copyWithType(TestOutputWord.class) - .ignoreTypeHeaders(), - new JsonDeserializer() - .copyWithType(TestOutputWordCounter.class) - .ignoreTypeHeaders()); + new JsonDeserializer(TestOutputWord.class).ignoreTypeHeaders(), + new JsonDeserializer(TestOutputWordCounter.class).ignoreTypeHeaders()); } diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java b/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java index 1ecfdbd..862eb2b 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java @@ -33,37 +33,37 @@ class TestData private static final KeyValue[] INPUT_MESSAGES = new KeyValue[] { - new KeyValue<>( + KeyValue.pair( TestInputUser.of(PETER), TestInputWord.of(PETER, WORD_HALLO)), - new KeyValue<>( + KeyValue.pair( TestInputUser.of(KLAUS), TestInputWord.of(KLAUS, WORD_MÜSCH)), - new KeyValue<>( + KeyValue.pair( TestInputUser.of(PETER), TestInputWord.of(PETER, WORD_WELT)), - new KeyValue<>( + KeyValue.pair( TestInputUser.of(KLAUS), TestInputWord.of(KLAUS, WORD_MÜSCH)), - new KeyValue<>( + KeyValue.pair( TestInputUser.of(KLAUS), TestInputWord.of(KLAUS, WORD_S)), - new KeyValue<>( + KeyValue.pair( TestInputUser.of(PETER), TestInputWord.of(PETER, WORD_BOÄH)), - new KeyValue<>( + KeyValue.pair( TestInputUser.of(PETER), TestInputWord.of(PETER, WORD_WELT)), - new KeyValue<>( + KeyValue.pair( TestInputUser.of(PETER), TestInputWord.of(PETER, WORD_BOÄH)), - new KeyValue<>( + KeyValue.pair( TestInputUser.of(KLAUS), TestInputWord.of(KLAUS, WORD_S)), - new KeyValue<>( + KeyValue.pair( TestInputUser.of(PETER), TestInputWord.of(PETER, WORD_BOÄH)), - new KeyValue<>( + KeyValue.pair( TestInputUser.of(KLAUS), TestInputWord.of(KLAUS, WORD_S)), }; -- 2.20.1 From c7f134403fb392077f24567d916a211949c7b197 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 16 Jun 2024 19:18:43 +0200 Subject: [PATCH 11/16] counter: 1.3.1 - Removed the defaults for serialization/deserialization --- .../counter/CounterApplicationConfiguriation.java | 2 -- .../wordcount/counter/CounterStreamProcessor.java | 11 +++++++---- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java index 484b8de..d9579a5 100644 --- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java +++ b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java @@ -50,8 +50,6 @@ public class CounterApplicationConfiguriation propertyMap.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); propertyMap.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); - propertyMap.put(JsonDeserializer.KEY_DEFAULT_TYPE, User.class.getName()); - propertyMap.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Word.class.getName()); propertyMap.put( JsonDeserializer.TYPE_MAPPINGS, "user:" + User.class.getName() + "," + diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java index 64bd619..8b9c12b 100644 --- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java @@ -2,7 +2,7 @@ package de.juplo.kafka.wordcount.counter; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.streams.*; -import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.QueryableStoreTypes; @@ -42,9 +42,12 @@ public class CounterStreamProcessor { StreamsBuilder builder = new StreamsBuilder(); - KStream source = builder.stream(inputTopic); - - source + builder + .stream( + inputTopic, + Consumed.with( + new JsonSerde<>(User.class), + new JsonSerde<>(Word.class))) .map((key, word) -> new KeyValue<>(word, word)) .groupByKey() .count( -- 2.20.1 From 29899898af28dcbb50326ae5837ada195829c592 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 16 Jun 2024 19:38:39 +0200 Subject: [PATCH 12/16] counter: 1.3.1 - Refined `CounterStreamProcessorTopologyTest` * `CounterStreamProcessorTopologyTest` uses the type-headers to determine the correct type for the deserialization of the output-data. * Beforehand, the used types were hard-coded in the test. --- .../CounterStreamProcessorTopologyTest.java | 55 ++++++++++++++++--- 1 file changed, 46 insertions(+), 9 deletions(-) diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java index 9c86c6c..06a0798 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java @@ -19,6 +19,9 @@ import org.springframework.kafka.support.serializer.JsonSerializer; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; +import java.util.Map; +import java.util.stream.Collectors; + import static de.juplo.kafka.wordcount.counter.CounterApplicationConfiguriation.serializationConfig; import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME; @@ -45,15 +48,8 @@ public class CounterStreamProcessorTopologyTest testDriver = new TopologyTestDriver(topology, serializationConfig()); - in = testDriver.createInputTopic( - IN, - new JsonSerializer().noTypeInfo(), - new JsonSerializer().noTypeInfo()); - - out = testDriver.createOutputTopic( - OUT, - new JsonDeserializer(TestOutputWord.class).ignoreTypeHeaders(), - new JsonDeserializer(TestOutputWordCounter.class).ignoreTypeHeaders()); + in = testDriver.createInputTopic(IN, serializer(), serializer()); + out = testDriver.createOutputTopic(OUT, keyDeserializer(), valueDeserializer()); } @@ -83,4 +79,45 @@ public class CounterStreamProcessorTopologyTest { testDriver.close(); } + + + private static JsonSerializer serializer() + { + return new JsonSerializer().noTypeInfo(); + } + + private JsonDeserializer keyDeserializer() + { + return deserializer(true); + } + + private static JsonDeserializer valueDeserializer() + { + return deserializer(false); + } + + private static JsonDeserializer deserializer(boolean isKey) + { + JsonDeserializer deserializer = new JsonDeserializer<>(); + deserializer.configure( + Map.of(JsonDeserializer.TYPE_MAPPINGS, typeMappingsConfig()), + isKey); + return deserializer; + } + + private static String typeMappingsConfig() + { + return typeMappings() + .entrySet() + .stream() + .map(entry -> entry.getKey() + ":" + entry.getValue().getName()) + .collect(Collectors.joining(",")); + } + + private static Map typeMappings() + { + return Map.of( + "word", TestOutputWord.class, + "counter", TestOutputWordCounter.class); + } } -- 2.20.1 From 419be8ac0668ecb0e34b3a432cf2dcca1c3642dc Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 16 Jun 2024 19:57:23 +0200 Subject: [PATCH 13/16] counter: 1.3.1 - Refined/Simplified the type-mapping * Removed all explicit type-mappings for the internally used types. * This greatly simplifies the configuration, because it is sufficient to configure the trusted package to serialize/deserialize all internally used types. * To make this possible, the type-mappings for the outgoing messages are specified with `Produced.with()` in the ``to()``-operation. --- .../CounterApplicationConfiguriation.java | 6 +-- .../counter/CounterStreamProcessor.java | 43 ++++++++++++++++++- 2 files changed, 43 insertions(+), 6 deletions(-) diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java index d9579a5..174521f 100644 --- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java +++ b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java @@ -50,11 +50,7 @@ public class CounterApplicationConfiguriation propertyMap.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); propertyMap.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); - propertyMap.put( - JsonDeserializer.TYPE_MAPPINGS, - "user:" + User.class.getName() + "," + - "word:" + Word.class.getName() + "," + - "counter:" + WordCounter.class.getName()); + propertyMap.put(JsonDeserializer.TRUSTED_PACKAGES, CounterApplication.class.getPackageName()); return propertyMap; } diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java index 8b9c12b..97d460f 100644 --- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java @@ -4,12 +4,16 @@ import lombok.extern.slf4j.Slf4j; import org.apache.kafka.streams.*; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; import org.springframework.kafka.support.serializer.JsonSerde; +import org.springframework.kafka.support.serializer.JsonSerializer; +import java.util.Map; import java.util.Properties; +import java.util.stream.Collectors; @Slf4j @@ -56,7 +60,7 @@ public class CounterStreamProcessor .withKeySerde(new JsonSerde<>().copyWithType(Word.class).forKeys())) .toStream() .map((word, counter) -> new KeyValue<>(word, WordCounter.of(word, counter))) - .to(outputTopic); + .to(outputTopic, Produced.with(outKeySerde(), outValueSerde())); Topology topology = builder.build(); log.info("\n\n{}", topology.describe()); @@ -80,4 +84,41 @@ public class CounterStreamProcessor log.info("Stopping Stream-Processor"); streams.close(); } + + + + public static JsonSerde outKeySerde() + { + return serde(true); + } + + public static JsonSerde outValueSerde() + { + return serde(false); + } + + public static JsonSerde serde(boolean isKey) + { + JsonSerde serde = new JsonSerde<>(); + serde.configure( + Map.of(JsonSerializer.TYPE_MAPPINGS, typeMappingsConfig()), + isKey); + return serde; + } + + private static String typeMappingsConfig() + { + return typeMappings() + .entrySet() + .stream() + .map(entry -> entry.getKey() + ":" + entry.getValue().getName()) + .collect(Collectors.joining(",")); + } + + private static Map typeMappings() + { + return Map.of( + "word", Word.class, + "counter", WordCounter.class); + } } -- 2.20.1 From bdd62b6b906db3b10ae04d2b4c80e6426a90ad7f Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 16 Jun 2024 21:08:01 +0200 Subject: [PATCH 14/16] counter: 1.3.1 - Refined `CounterStreamProcessor` (serde-config) * Refactored the creation of the ``JsonSerde``s, that are used to consume the incomming messages. * All special ``Serdes``, that are used for incomming and outgoing messages, are created in separted methods now. * Removed unnecessary operatorx in the ``Materialized``-configuration for the state store (the operator is not necessary, because no headers are present, when deserializing from a store). --- .../counter/CounterStreamProcessor.java | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java index 97d460f..c983a25 100644 --- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java @@ -47,17 +47,13 @@ public class CounterStreamProcessor StreamsBuilder builder = new StreamsBuilder(); builder - .stream( - inputTopic, - Consumed.with( - new JsonSerde<>(User.class), - new JsonSerde<>(Word.class))) + .stream(inputTopic, Consumed.with(inKeySerde(), inValueSerde())) .map((key, word) -> new KeyValue<>(word, word)) .groupByKey() .count( Materialized .as(storeSupplier) - .withKeySerde(new JsonSerde<>().copyWithType(Word.class).forKeys())) + .withKeySerde(new JsonSerde<>(Word.class))) // No headers are present: fixed typing is needed! .toStream() .map((word, counter) -> new KeyValue<>(word, WordCounter.of(word, counter))) .to(outputTopic, Produced.with(outKeySerde(), outValueSerde())); @@ -87,6 +83,16 @@ public class CounterStreamProcessor + public static JsonSerde inKeySerde() + { + return new JsonSerde<>(User.class); + } + + public static JsonSerde inValueSerde() + { + return new JsonSerde<>(Word.class); + } + public static JsonSerde outKeySerde() { return serde(true); -- 2.20.1 From 58681eb5c2ea655385c8209b07330e4ef0ab2c49 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 16 Jun 2024 21:17:27 +0200 Subject: [PATCH 15/16] counter: 1.3.1 - Refined `CounterStreamProcessor` (DRY for type-mapping) --- .../counter/CounterStreamProcessor.java | 16 ++++++++-------- .../CounterStreamProcessorTopologyTest.java | 14 +------------- 2 files changed, 9 insertions(+), 21 deletions(-) diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java index c983a25..2304e55 100644 --- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java @@ -114,17 +114,17 @@ public class CounterStreamProcessor private static String typeMappingsConfig() { - return typeMappings() - .entrySet() - .stream() - .map(entry -> entry.getKey() + ":" + entry.getValue().getName()) - .collect(Collectors.joining(",")); + return typeMappingsConfig(Word.class, WordCounter.class); } - private static Map typeMappings() + public static String typeMappingsConfig(Class wordClass, Class wordCounterClass) { return Map.of( - "word", Word.class, - "counter", WordCounter.class); + "word", wordClass, + "counter", wordCounterClass) + .entrySet() + .stream() + .map(entry -> entry.getKey() + ":" + entry.getValue().getName()) + .collect(Collectors.joining(",")); } } diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java index 06a0798..cfb6bd8 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java @@ -20,7 +20,6 @@ import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; import java.util.Map; -import java.util.stream.Collectors; import static de.juplo.kafka.wordcount.counter.CounterApplicationConfiguriation.serializationConfig; import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME; @@ -107,17 +106,6 @@ public class CounterStreamProcessorTopologyTest private static String typeMappingsConfig() { - return typeMappings() - .entrySet() - .stream() - .map(entry -> entry.getKey() + ":" + entry.getValue().getName()) - .collect(Collectors.joining(",")); - } - - private static Map typeMappings() - { - return Map.of( - "word", TestOutputWord.class, - "counter", TestOutputWordCounter.class); + return CounterStreamProcessor.typeMappingsConfig(TestOutputWord.class, TestOutputWordCounter.class); } } -- 2.20.1 From f9156ed2e1f43b64d39cc39a13d12ca8c0d24219 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 16 Jun 2024 22:33:52 +0200 Subject: [PATCH 16/16] counter: 1.3.1 - Splitted up test in `CounterStreamProcessorTopologyTest` --- .../CounterStreamProcessorTopologyTest.java | 55 +++++++++++++------ 1 file changed, 37 insertions(+), 18 deletions(-) diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java index cfb6bd8..e80e383 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java @@ -11,8 +11,9 @@ import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.TopologyTestDriver; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.Stores; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import org.springframework.kafka.support.serializer.JsonDeserializer; import org.springframework.kafka.support.serializer.JsonSerializer; @@ -32,13 +33,12 @@ public class CounterStreamProcessorTopologyTest public static final String OUT = "TEST-OUT"; - TopologyTestDriver testDriver; - TestInputTopic in; - TestOutputTopic out; + static TopologyTestDriver testDriver; + static MultiValueMap receivedMessages = new LinkedMultiValueMap<>(); - @BeforeEach - public void setUpTestDriver() + @BeforeAll + public static void setUpTestDriver() { Topology topology = CounterStreamProcessor.buildTopology( IN, @@ -47,34 +47,53 @@ public class CounterStreamProcessorTopologyTest testDriver = new TopologyTestDriver(topology, serializationConfig()); - in = testDriver.createInputTopic(IN, serializer(), serializer()); - out = testDriver.createOutputTopic(OUT, keyDeserializer(), valueDeserializer()); - } - + TestInputTopic in = + testDriver.createInputTopic(IN, serializer(), serializer()); + TestOutputTopic out = + testDriver.createOutputTopic(OUT, keyDeserializer(), valueDeserializer()); - @Test - public void test() - { TestData .getInputMessages() .forEach(kv -> in.pipeInput(kv.key, kv.value)); - MultiValueMap receivedMessages = new LinkedMultiValueMap<>(); + receivedMessages = new LinkedMultiValueMap<>(); out .readRecordsToList() .forEach(record -> receivedMessages.add(record.key(), record.value())); + } + + @DisplayName("Assert the expected output messages") + @Test + public void testExpectedMessages() + { TestData.assertExpectedMessages(receivedMessages); + } + @DisplayName("Assert the expected number of messages") + @Test + public void testExpectedNumberOfMessagesForWord() + { TestData.assertExpectedNumberOfMessagesForWord(receivedMessages); + } + + @DisplayName("Await the expected final output messages") + @Test + public void testExpectedLastMessagesForWord() + { TestData.assertExpectedLastMessagesForWord(receivedMessages); + } + @DisplayName("Assert the expected state in the state-store") + @Test + public void testExpectedState() + { KeyValueStore store = testDriver.getKeyValueStore(STORE_NAME); TestData.assertExpectedState(store); } - @AfterEach - public void tearDown() + @AfterAll + public static void tearDown() { testDriver.close(); } @@ -85,7 +104,7 @@ public class CounterStreamProcessorTopologyTest return new JsonSerializer().noTypeInfo(); } - private JsonDeserializer keyDeserializer() + private static JsonDeserializer keyDeserializer() { return deserializer(true); } -- 2.20.1