counter: 1.2.15 - Separated serialization-config into a static method
authorKai Moritz <kai@juplo.de>
Wed, 5 Jun 2024 15:06:30 +0000 (17:06 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 8 Jun 2024 11:33:30 +0000 (13:33 +0200)
src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java
src/main/java/de/juplo/kafka/wordcount/counter/Word.java
src/main/java/de/juplo/kafka/wordcount/counter/WordCounter.java
src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java
src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java
src/test/java/de/juplo/kafka/wordcount/counter/TestData.java
src/test/java/de/juplo/kafka/wordcount/splitter/TestInputWord.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/wordcount/top10/TestOutputWord.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/wordcount/top10/TestOutputWordCounter.java [new file with mode: 0644]

index 926045c..34217da 100644 (file)
@@ -25,12 +25,27 @@ import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.St
 public class CounterApplicationConfiguriation
 {
        @Bean
-       public Properties streamProcessorProperties(CounterApplicationProperties counterProperties)
+       public Properties streamProcessorProperties(
+                       CounterApplicationProperties counterProperties)
        {
-               Properties propertyMap = new Properties();
+               Properties propertyMap = serializationConfig();
 
                propertyMap.put(StreamsConfig.APPLICATION_ID_CONFIG, counterProperties.getApplicationId());
                propertyMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, counterProperties.getBootstrapServer());
+               propertyMap.put(StreamsConfig.STATE_DIR_CONFIG, "target");
+               if (counterProperties.getCommitInterval() != null)
+                       propertyMap.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, counterProperties.getCommitInterval());
+               if (counterProperties.getCacheMaxBytes() != null)
+                       propertyMap.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, counterProperties.getCacheMaxBytes());
+               propertyMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+               return propertyMap;
+       }
+
+       static Properties serializationConfig()
+       {
+               Properties propertyMap = new Properties();
+
                propertyMap.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
                propertyMap.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
                propertyMap.put(JsonDeserializer.TRUSTED_PACKAGES, CounterApplication.class.getPackageName());
@@ -41,12 +56,6 @@ public class CounterApplicationConfiguriation
                                "word:" + Word.class.getName() + "," +
                                "counter:" + WordCounter.class.getName());
                propertyMap.put(JsonDeserializer.REMOVE_TYPE_INFO_HEADERS, Boolean.FALSE);
-               propertyMap.put(StreamsConfig.STATE_DIR_CONFIG, "target");
-               if (counterProperties.getCommitInterval() != null)
-                       propertyMap.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, counterProperties.getCommitInterval());
-               if (counterProperties.getCacheMaxBytes() != null)
-                       propertyMap.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, counterProperties.getCacheMaxBytes());
-               propertyMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 
                return propertyMap;
        }
index 4aa5ee2..77287d5 100644 (file)
@@ -1,13 +1,9 @@
 package de.juplo.kafka.wordcount.counter;
 
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import lombok.AllArgsConstructor;
 import lombok.Data;
-import lombok.NoArgsConstructor;
 
 
-@AllArgsConstructor(staticName = "of")
-@NoArgsConstructor
 @Data
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class Word
index 1334e5b..f1fce71 100644 (file)
@@ -1,5 +1,6 @@
 package de.juplo.kafka.wordcount.counter;
 
+import lombok.AccessLevel;
 import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.NoArgsConstructor;
@@ -7,7 +8,7 @@ import lombok.NoArgsConstructor;
 
 @Data
 @NoArgsConstructor
-@AllArgsConstructor(staticName = "of")
+@AllArgsConstructor(access = AccessLevel.PRIVATE)
 public class WordCounter
 {
   String user;
index ad4faf2..025a160 100644 (file)
@@ -1,5 +1,8 @@
 package de.juplo.kafka.wordcount.counter;
 
+import de.juplo.kafka.wordcount.splitter.TestInputWord;
+import de.juplo.kafka.wordcount.top10.TestOutputWord;
+import de.juplo.kafka.wordcount.top10.TestOutputWordCounter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.Stores;
@@ -20,7 +23,6 @@ import org.springframework.util.LinkedMultiValueMap;
 import org.springframework.util.MultiValueMap;
 
 import java.time.Duration;
-import java.util.stream.Stream;
 
 import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.TOPIC_IN;
 import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.TOPIC_OUT;
@@ -35,8 +37,8 @@ import static org.awaitility.Awaitility.await;
                                "spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
                                "spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
                                "spring.kafka.consumer.properties.spring.json.use.type.headers=false",
-                               "spring.kafka.consumer.properties.spring.json.key.default.type=de.juplo.kafka.wordcount.counter.Word",
-                               "spring.kafka.consumer.properties.spring.json.value.default.type=de.juplo.kafka.wordcount.counter.WordCounter",
+                               "spring.kafka.consumer.properties.spring.json.key.default.type=de.juplo.kafka.wordcount.top10.TestOutputWord",
+                               "spring.kafka.consumer.properties.spring.json.value.default.type=de.juplo.kafka.wordcount.top10.TestOutputWordCounter",
                                "logging.level.root=WARN",
                                "logging.level.de.juplo=DEBUG",
                                "juplo.wordcount.counter.bootstrap-server=${spring.embedded.kafka.brokers}",
@@ -52,7 +54,7 @@ public class CounterApplicationIT
        public static final String TOPIC_OUT = "out";
 
        @Autowired
-       KafkaTemplate<String, Word> kafkaTemplate;
+       KafkaTemplate<String, TestInputWord> kafkaTemplate;
        @Autowired
        Consumer consumer;
 
@@ -67,8 +69,8 @@ public class CounterApplicationIT
        @Test
        void testSendMessage()
        {
-               Stream
-                               .of(TestData.INPUT_MESSAGES)
+               TestData
+                               .getInputMessages()
                                .forEach(word -> kafkaTemplate.send(TOPIC_IN, word.getUser(), word));
 
                await("Expected messages")
@@ -79,18 +81,18 @@ public class CounterApplicationIT
 
        static class Consumer
        {
-               private final MultiValueMap<Word, WordCounter> received = new LinkedMultiValueMap<>();
+               private final MultiValueMap<TestOutputWord, TestOutputWordCounter> received = new LinkedMultiValueMap<>();
 
                @KafkaListener(groupId = "TEST", topics = TOPIC_OUT)
                public synchronized void receive(
-                               @Header(KafkaHeaders.RECEIVED_KEY) Word word,
-                               @Payload WordCounter counter)
+                               @Header(KafkaHeaders.RECEIVED_KEY) TestOutputWord word,
+                               @Payload TestOutputWordCounter counter)
                {
                        log.debug("Received message: {} -> {}", word, counter);
                        received.add(word, counter);
                }
 
-               synchronized MultiValueMap<Word, WordCounter> getReceivedMessages()
+               synchronized MultiValueMap<TestOutputWord, TestOutputWordCounter> getReceivedMessages()
                {
                        return received;
                }
index 8e09d0c..e5964dc 100644 (file)
@@ -1,6 +1,10 @@
 package de.juplo.kafka.wordcount.counter;
 
+import de.juplo.kafka.wordcount.splitter.TestInputWord;
+import de.juplo.kafka.wordcount.top10.TestOutputWord;
+import de.juplo.kafka.wordcount.top10.TestOutputWordCounter;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.TestInputTopic;
 import org.apache.kafka.streams.TestOutputTopic;
 import org.apache.kafka.streams.Topology;
@@ -10,16 +14,11 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.springframework.kafka.support.serializer.JsonDeserializer;
-import org.springframework.kafka.support.serializer.JsonSerde;
 import org.springframework.kafka.support.serializer.JsonSerializer;
 import org.springframework.util.LinkedMultiValueMap;
 import org.springframework.util.MultiValueMap;
 
-import java.util.Map;
-import java.util.Properties;
-import java.util.stream.Stream;
-
-import static de.juplo.kafka.wordcount.counter.TestData.convertToMap;
+import static de.juplo.kafka.wordcount.counter.CounterApplicationConfiguriation.serializationConfig;
 import static de.juplo.kafka.wordcount.counter.TestData.parseHeader;
 import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME;
 import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.KEY_DEFAULT_CLASSID_FIELD_NAME;
@@ -33,8 +32,8 @@ public class CounterStreamProcessorTopologyTest
 
 
   TopologyTestDriver testDriver;
-  TestInputTopic<String, Word> in;
-  TestOutputTopic<Word, WordCounter> out;
+  TestInputTopic<String, TestInputWord> in;
+  TestOutputTopic<TestOutputWord, TestOutputWordCounter> out;
 
 
   @BeforeEach
@@ -45,39 +44,32 @@ public class CounterStreamProcessorTopologyTest
         OUT,
         Stores.inMemoryKeyValueStore("TOPOLOGY-TEST"));
 
-    CounterApplicationConfiguriation applicationConfiguriation =
-        new CounterApplicationConfiguriation();
-    Properties streamProcessorProperties =
-        applicationConfiguriation.streamProcessorProperties(new CounterApplicationProperties());
-    Map<String, Object> propertyMap = convertToMap(streamProcessorProperties);
-
-    JsonSerde<?> keySerde = new JsonSerde<>();
-    keySerde.configure(propertyMap, true);
-    JsonSerde<?> valueSerde = new JsonSerde<>();
-    valueSerde.configure(propertyMap, false);
-
-    testDriver = new TopologyTestDriver(topology, streamProcessorProperties);
+    testDriver = new TopologyTestDriver(topology, serializationConfig());
 
     in = testDriver.createInputTopic(
         IN,
-        (JsonSerializer<String>)keySerde.serializer(),
-        (JsonSerializer<Word>)valueSerde.serializer());
+        new StringSerializer(),
+        new JsonSerializer().noTypeInfo());
 
     out = testDriver.createOutputTopic(
         OUT,
-        (JsonDeserializer<Word>)keySerde.deserializer(),
-        (JsonDeserializer<WordCounter>)valueSerde.deserializer());
+        new JsonDeserializer()
+            .copyWithType(TestOutputWord.class)
+            .ignoreTypeHeaders(),
+        new JsonDeserializer()
+            .copyWithType(TestOutputWordCounter.class)
+            .ignoreTypeHeaders());
   }
 
 
   @Test
   public void test()
   {
-    Stream
-        .of(TestData.INPUT_MESSAGES)
+    TestData
+        .getInputMessages()
         .forEach(word -> in.pipeInput(word.getUser(), word));
 
-    MultiValueMap<Word, WordCounter> receivedMessages = new LinkedMultiValueMap<>();
+    MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages = new LinkedMultiValueMap<>();
     out
         .readRecordsToList()
         .forEach(record ->
index 5dc8bc2..6419059 100644 (file)
@@ -1,14 +1,14 @@
 package de.juplo.kafka.wordcount.counter;
 
+import de.juplo.kafka.wordcount.splitter.TestInputWord;
+import de.juplo.kafka.wordcount.top10.TestOutputWord;
+import de.juplo.kafka.wordcount.top10.TestOutputWordCounter;
 import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.streams.KeyValue;
 import org.springframework.util.LinkedMultiValueMap;
 import org.springframework.util.MultiValueMap;
 
-import java.util.Map;
-import java.util.Properties;
-import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -16,22 +16,27 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 class TestData
 {
-       static final Word[] INPUT_MESSAGES = new Word[]
+       private static final TestInputWord[] INPUT_MESSAGES = new TestInputWord[]
        {
-                       Word.of("peter","Hallo"),
-                       Word.of("klaus","Müsch"),
-                       Word.of("peter","Welt"),
-                       Word.of("klaus","Müsch"),
-                       Word.of("klaus","s"),
-                       Word.of("peter","Boäh"),
-                       Word.of("peter","Welt"),
-                       Word.of("peter","Boäh"),
-                       Word.of("klaus","s"),
-                       Word.of("peter","Boäh"),
-                       Word.of("klaus","s"),
+                       TestInputWord.of("peter","Hallo"),
+                       TestInputWord.of("klaus","Müsch"),
+                       TestInputWord.of("peter","Welt"),
+                       TestInputWord.of("klaus","Müsch"),
+                       TestInputWord.of("klaus","s"),
+                       TestInputWord.of("peter","Boäh"),
+                       TestInputWord.of("peter","Welt"),
+                       TestInputWord.of("peter","Boäh"),
+                       TestInputWord.of("klaus","s"),
+                       TestInputWord.of("peter","Boäh"),
+                       TestInputWord.of("klaus","s"),
        };
 
-       static void assertExpectedMessages(MultiValueMap<Word, WordCounter> receivedMessages)
+       static Stream<TestInputWord> getInputMessages()
+       {
+               return Stream.of(TestData.INPUT_MESSAGES);
+       }
+
+       static void assertExpectedMessages(MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages)
        {
                expectedMessages().forEach(
                                (word, counter) ->
@@ -39,64 +44,52 @@ class TestData
                                                                .containsExactlyElementsOf(counter));
        }
 
-       static final KeyValue<Word, WordCounter>[] EXPECTED_MESSAGES = new KeyValue[]
+       private static final KeyValue<TestOutputWord, TestOutputWordCounter>[] EXPECTED_MESSAGES = new KeyValue[]
        {
                        KeyValue.pair(
-                                       Word.of("peter","Hallo"),
-                                       WordCounter.of("peter","Hallo",1)),
+                                       TestOutputWord.of("peter","Hallo"),
+                                       TestOutputWordCounter.of("peter","Hallo",1)),
                        KeyValue.pair(
-                                       Word.of("klaus","Müsch"),
-                                       WordCounter.of("klaus","Müsch",1)),
+                                       TestOutputWord.of("klaus","Müsch"),
+                                       TestOutputWordCounter.of("klaus","Müsch",1)),
                        KeyValue.pair(
-                                       Word.of("peter","Welt"),
-                                       WordCounter.of("peter","Welt",1)),
+                                       TestOutputWord.of("peter","Welt"),
+                                       TestOutputWordCounter.of("peter","Welt",1)),
                        KeyValue.pair(
-                                       Word.of("klaus","Müsch"),
-                                       WordCounter.of("klaus","Müsch",2)),
+                                       TestOutputWord.of("klaus","Müsch"),
+                                       TestOutputWordCounter.of("klaus","Müsch",2)),
                        KeyValue.pair(
-                                       Word.of("klaus","s"),
-                                       WordCounter.of("klaus","s",1)),
+                                       TestOutputWord.of("klaus","s"),
+                                       TestOutputWordCounter.of("klaus","s",1)),
                        KeyValue.pair(
-                                       Word.of("peter","Boäh"),
-                                       WordCounter.of("peter","Boäh",1)),
+                                       TestOutputWord.of("peter","Boäh"),
+                                       TestOutputWordCounter.of("peter","Boäh",1)),
                        KeyValue.pair(
-                                       Word.of("peter","Welt"),
-                                       WordCounter.of("peter","Welt",2)),
+                                       TestOutputWord.of("peter","Welt"),
+                                       TestOutputWordCounter.of("peter","Welt",2)),
                        KeyValue.pair(
-                                       Word.of("peter","Boäh"),
-                                       WordCounter.of("peter","Boäh",2)),
+                                       TestOutputWord.of("peter","Boäh"),
+                                       TestOutputWordCounter.of("peter","Boäh",2)),
                        KeyValue.pair(
-                                       Word.of("klaus","s"),
-                                       WordCounter.of("klaus","s",2)),
+                                       TestOutputWord.of("klaus","s"),
+                                       TestOutputWordCounter.of("klaus","s",2)),
                        KeyValue.pair(
-                                       Word.of("peter","Boäh"),
-                                       WordCounter.of("peter","Boäh",3)),
+                                       TestOutputWord.of("peter","Boäh"),
+                                       TestOutputWordCounter.of("peter","Boäh",3)),
                        KeyValue.pair(
-                                       Word.of("klaus","s"),
-                                       WordCounter.of("klaus","s",3)),
+                                       TestOutputWord.of("klaus","s"),
+                                       TestOutputWordCounter.of("klaus","s",3)),
        };
 
-       static MultiValueMap<Word, WordCounter> expectedMessages()
+       static MultiValueMap<TestOutputWord, TestOutputWordCounter> expectedMessages()
        {
-               MultiValueMap<Word, WordCounter> expectedMessages = new LinkedMultiValueMap<>();
+               MultiValueMap<TestOutputWord, TestOutputWordCounter> expectedMessages = new LinkedMultiValueMap<>();
                Stream
                                .of(EXPECTED_MESSAGES)
                                .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value));
                return expectedMessages;
        }
 
-       static Map<String, Object> convertToMap(Properties properties)
-       {
-               return properties
-                               .entrySet()
-                               .stream()
-                               .collect(
-                                               Collectors.toMap(
-                                                               entry -> (String)entry.getKey(),
-                                                               entry -> entry.getValue()
-                                               ));
-       }
-
        static String parseHeader(Headers headers, String key)
        {
                Header header = headers.lastHeader(key);
diff --git a/src/test/java/de/juplo/kafka/wordcount/splitter/TestInputWord.java b/src/test/java/de/juplo/kafka/wordcount/splitter/TestInputWord.java
new file mode 100644 (file)
index 0000000..71ed1d9
--- /dev/null
@@ -0,0 +1,15 @@
+package de.juplo.kafka.wordcount.splitter;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor(staticName = "of")
+public class TestInputWord
+{
+  String user;
+  String word;
+}
diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/TestOutputWord.java b/src/test/java/de/juplo/kafka/wordcount/top10/TestOutputWord.java
new file mode 100644 (file)
index 0000000..cfc2cae
--- /dev/null
@@ -0,0 +1,15 @@
+package de.juplo.kafka.wordcount.top10;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor(staticName = "of")
+public class TestOutputWord
+{
+  String user;
+  String word;
+}
diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/TestOutputWordCounter.java b/src/test/java/de/juplo/kafka/wordcount/top10/TestOutputWordCounter.java
new file mode 100644 (file)
index 0000000..1b59387
--- /dev/null
@@ -0,0 +1,16 @@
+package de.juplo.kafka.wordcount.top10;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor(staticName = "of")
+public class TestOutputWordCounter
+{
+  String user;
+  String word;
+  long counter;
+}