splitter: 1.2.0 - A domain-class (``User``) is used as key
authorKai Moritz <kai@juplo.de>
Sun, 2 Jun 2024 16:41:02 +0000 (18:41 +0200)
committerKai Moritz <kai@juplo.de>
Tue, 4 Jun 2024 21:25:54 +0000 (23:25 +0200)
src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationConfiguration.java
src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java
src/main/java/de/juplo/kafka/wordcount/splitter/User.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/wordcount/counter/TestOutputUser.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/wordcount/recorder/TestInputUser.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationIT.java
src/test/java/de/juplo/kafka/wordcount/splitter/TestData.java

index ead41f8..7143e1a 100644 (file)
@@ -33,9 +33,10 @@ public class SplitterApplicationConfiguration
 
                propertyMap.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId());
                propertyMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
-               propertyMap.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
+               propertyMap.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
                propertyMap.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
                propertyMap.put(JsonDeserializer.TRUSTED_PACKAGES, SplitterApplication.class.getName());
+               propertyMap.put(JsonDeserializer.KEY_DEFAULT_TYPE, User.class.getName());
                propertyMap.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Recording.class.getName());
                propertyMap.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
                propertyMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
index 60c569b..d0070c0 100644 (file)
@@ -25,7 +25,7 @@ public class SplitterStreamProcessor
        {
                StreamsBuilder builder = new StreamsBuilder();
 
-               KStream<String, Recording> source = builder.stream(inputTopic);
+               KStream<User, Recording> source = builder.stream(inputTopic);
 
                source
                                .flatMapValues(recording -> Arrays
diff --git a/src/main/java/de/juplo/kafka/wordcount/splitter/User.java b/src/main/java/de/juplo/kafka/wordcount/splitter/User.java
new file mode 100644 (file)
index 0000000..8a65695
--- /dev/null
@@ -0,0 +1,12 @@
+package de.juplo.kafka.wordcount.splitter;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import lombok.Data;
+
+
+@Data
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class User
+{
+  String user;
+}
diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/TestOutputUser.java b/src/test/java/de/juplo/kafka/wordcount/counter/TestOutputUser.java
new file mode 100644 (file)
index 0000000..4406b3b
--- /dev/null
@@ -0,0 +1,14 @@
+package de.juplo.kafka.wordcount.counter;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor(staticName = "of")
+public class TestOutputUser
+{
+  String user;
+}
diff --git a/src/test/java/de/juplo/kafka/wordcount/recorder/TestInputUser.java b/src/test/java/de/juplo/kafka/wordcount/recorder/TestInputUser.java
new file mode 100644 (file)
index 0000000..ce413ba
--- /dev/null
@@ -0,0 +1,14 @@
+package de.juplo.kafka.wordcount.recorder;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor(staticName = "of")
+public class TestInputUser
+{
+  String user;
+}
index 891a435..e945b31 100644 (file)
@@ -1,7 +1,9 @@
 package de.juplo.kafka.wordcount.splitter;
 
+import de.juplo.kafka.wordcount.counter.TestOutputUser;
 import de.juplo.kafka.wordcount.counter.TestOutputWord;
 import de.juplo.kafka.wordcount.recorder.TestInputRecording;
+import de.juplo.kafka.wordcount.recorder.TestInputUser;
 import lombok.extern.slf4j.Slf4j;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
@@ -28,12 +30,14 @@ import static org.awaitility.Awaitility.await;
 
 @SpringBootTest(
                properties = {
+                               "spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
                                "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
                                "spring.kafka.producer.properties.spring.json.add.type.headers=false",
                                "spring.kafka.consumer.auto-offset-reset=earliest",
+                               "spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
                                "spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
-                               "spring.kafka.consumer.properties.spring.json.value.default.type=de.juplo.kafka.wordcount.counter.TestWord",
-                               "spring.kafka.consumer.properties.spring.json.trusted.packages=de.juplo.kafka.wordcount.splitter",
+                               "spring.kafka.consumer.properties.spring.json.key.default.type=de.juplo.kafka.wordcount.counter.TestOutputUser",
+                               "spring.kafka.consumer.properties.spring.json.value.default.type=de.juplo.kafka.wordcount.counter.TestOutputWord",
                                "logging.level.root=WARN",
                                "logging.level.de.juplo=DEBUG",
                                "juplo.wordcount.splitter.bootstrap-server=${spring.embedded.kafka.brokers}",
@@ -51,7 +55,7 @@ public class SplitterApplicationIT
 
        @BeforeAll
        public static void testSendMessage(
-                       @Autowired KafkaTemplate<String, TestInputRecording> kafkaTemplate)
+                       @Autowired KafkaTemplate<TestInputUser, TestInputRecording> kafkaTemplate)
        {
                TestData
                                .getInputMessages()
@@ -59,7 +63,7 @@ public class SplitterApplicationIT
                                {
                                        try
                                        {
-                                               SendResult<String, TestInputRecording> result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get();
+                                               SendResult<TestInputUser, TestInputRecording> result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get();
                                                log.info(
                                                                "Sent: {}={}, partition={}, offset={}",
                                                                result.getProducerRecord().key(),
@@ -78,7 +82,7 @@ public class SplitterApplicationIT
        @Test
        void testSendMessage() throws Exception
        {
-               await("Expexted converted data")
+               await("Expected converted data")
                                .atMost(Duration.ofSeconds(5))
                                .untilAsserted(() ->
                                                TestData.assertExpectedMessages(consumer.getReceivedMessages()));
@@ -87,18 +91,18 @@ public class SplitterApplicationIT
 
        static class Consumer
        {
-               private final MultiValueMap<String, TestOutputWord> received = new LinkedMultiValueMap<>();
+               private final MultiValueMap<TestOutputUser, TestOutputWord> received = new LinkedMultiValueMap<>();
 
                @KafkaListener(groupId = "TEST", topics = TOPIC_OUT)
                public synchronized void receive(
-                               @Header(KafkaHeaders.RECEIVED_KEY) String key,
+                               @Header(KafkaHeaders.RECEIVED_KEY) TestOutputUser key,
                                @Payload TestOutputWord value)
                {
                        log.debug("Received message: {}={}", key, value);
                        received.add(key, value);
                }
 
-               synchronized MultiValueMap<String, TestOutputWord> getReceivedMessages()
+               synchronized MultiValueMap<TestOutputUser, TestOutputWord> getReceivedMessages()
                {
                        return received;
                }
index feedb1e..f89b099 100644 (file)
@@ -1,7 +1,9 @@
 package de.juplo.kafka.wordcount.splitter;
 
+import de.juplo.kafka.wordcount.counter.TestOutputUser;
 import de.juplo.kafka.wordcount.counter.TestOutputWord;
 import de.juplo.kafka.wordcount.recorder.TestInputRecording;
+import de.juplo.kafka.wordcount.recorder.TestInputUser;
 import org.apache.kafka.streams.KeyValue;
 import org.springframework.util.LinkedMultiValueMap;
 import org.springframework.util.MultiValueMap;
@@ -15,29 +17,29 @@ import static org.awaitility.Awaitility.await;
 
 public class TestData
 {
-       static final String PETER = "peter";
-       static final String KLAUS = "klaus";
+       static final TestInputUser PETER = TestInputUser.of("peter");
+       static final TestInputUser KLAUS = TestInputUser.of("klaus");
 
 
-       static final Stream<KeyValue<String, TestInputRecording>> getInputMessages()
+       static final Stream<KeyValue<TestInputUser, TestInputRecording>> getInputMessages()
        {
                return Stream.of(INPUT_MESSAGES);
        }
 
-       private static final KeyValue<String, TestInputRecording>[] INPUT_MESSAGES = new KeyValue[]
+       private static final KeyValue<TestInputUser, TestInputRecording>[] INPUT_MESSAGES = new KeyValue[]
        {
                        new KeyValue<>(
                                        PETER,
-                                       TestInputRecording.of(PETER, "Hallo Welt!")),
+                                       TestInputRecording.of(PETER.getUser(), "Hallo Welt!")),
                        new KeyValue<>(
                                        KLAUS,
-                                       TestInputRecording.of(KLAUS, "Müsch gäb's auch!")),
+                                       TestInputRecording.of(KLAUS.getUser(), "Müsch gäb's auch!")),
                        new KeyValue<>(
                                        PETER,
-                                       TestInputRecording.of(PETER, "Boäh, echt! ß mal nä Nümmäh!")),
+                                       TestInputRecording.of(PETER.getUser(), "Boäh, echt! ß mal nä Nümmäh!")),
        };
 
-       static void assertExpectedMessages(MultiValueMap<String, TestOutputWord> receivedMessages)
+       static void assertExpectedMessages(MultiValueMap<TestOutputUser, TestOutputWord> receivedMessages)
        {
                await("Received expected messages")
                                .atMost(Duration.ofSeconds(5))
@@ -45,49 +47,49 @@ public class TestData
                                                assertThat(receivedMessages.get(user)).containsExactlyElementsOf(word)));
        }
 
-       private static final KeyValue<String, TestOutputWord>[] EXPECTED_MESSAGES = new KeyValue[]
+       private static final KeyValue<TestOutputUser, TestOutputWord>[] EXPECTED_MESSAGES = new KeyValue[]
        {
                        KeyValue.pair(
-                                       "peter",
-                                       TestOutputWord.of("peter", "Hallo")),
+                                       TestOutputUser.of(PETER.getUser()),
+                                       TestOutputWord.of(PETER.getUser(), "Hallo")),
                        KeyValue.pair(
-                                       "peter",
-                                       TestOutputWord.of("peter", "Welt")),
+                                       TestOutputUser.of(PETER.getUser()),
+                                       TestOutputWord.of(PETER.getUser(), "Welt")),
                        KeyValue.pair(
-                                       "klaus",
-                                       TestOutputWord.of("klaus", "Müsch")),
+                                       TestOutputUser.of(KLAUS.getUser()),
+                                       TestOutputWord.of(KLAUS.getUser(), "Müsch")),
                        KeyValue.pair(
-                                       "klaus",
-                                       TestOutputWord.of("klaus", "gäb")),
+                                       TestOutputUser.of(KLAUS.getUser()),
+                                       TestOutputWord.of(KLAUS.getUser(), "gäb")),
                        KeyValue.pair(
-                                       "klaus",
-                                       TestOutputWord.of("klaus", "s")),
+                                       TestOutputUser.of(KLAUS.getUser()),
+                                       TestOutputWord.of(KLAUS.getUser(), "s")),
                        KeyValue.pair(
-                                       "klaus",
-                                       TestOutputWord.of("klaus", "auch")),
+                                       TestOutputUser.of(KLAUS.getUser()),
+                                       TestOutputWord.of(KLAUS.getUser(), "auch")),
                        KeyValue.pair(
-                                       "peter",
-                                       TestOutputWord.of("peter", "Boäh")),
+                                       TestOutputUser.of(PETER.getUser()),
+                                       TestOutputWord.of(PETER.getUser(), "Boäh")),
                        KeyValue.pair(
-                                       "peter",
-                                       TestOutputWord.of("peter", "echt")),
+                                       TestOutputUser.of(PETER.getUser()),
+                                       TestOutputWord.of(PETER.getUser(), "echt")),
                        KeyValue.pair(
-                                       "peter",
-                                       TestOutputWord.of("peter", "ß")),
+                                       TestOutputUser.of(PETER.getUser()),
+                                       TestOutputWord.of(PETER.getUser(), "ß")),
                        KeyValue.pair(
-                                       "peter",
-                                       TestOutputWord.of("peter", "mal")),
+                                       TestOutputUser.of(PETER.getUser()),
+                                       TestOutputWord.of(PETER.getUser(), "mal")),
                        KeyValue.pair(
-                                       "peter",
-                                       TestOutputWord.of("peter", "nä")),
+                                       TestOutputUser.of(PETER.getUser()),
+                                       TestOutputWord.of(PETER.getUser(), "nä")),
                        KeyValue.pair(
-                                       "peter",
-                                       TestOutputWord.of("peter", "Nümmäh")),
+                                       TestOutputUser.of(PETER.getUser()),
+                                       TestOutputWord.of(PETER.getUser(), "Nümmäh")),
        };
 
-       static MultiValueMap<String, TestOutputWord> expectedMessages()
+       static MultiValueMap<TestOutputUser, TestOutputWord> expectedMessages()
        {
-               MultiValueMap<String, TestOutputWord> expectedMessages = new LinkedMultiValueMap<>();
+               MultiValueMap<TestOutputUser, TestOutputWord> expectedMessages = new LinkedMultiValueMap<>();
                Stream
                                .of(EXPECTED_MESSAGES)
                                .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value));