recorder: 1.2.0 - The key is also serialized as JSON (new type `User`)
authorKai Moritz <kai@juplo.de>
Sun, 2 Jun 2024 11:01:08 +0000 (13:01 +0200)
committerKai Moritz <kai@juplo.de>
Thu, 6 Jun 2024 18:42:42 +0000 (20:42 +0200)
src/main/java/de/juplo/kafka/wordcount/recorder/RecorderApplication.java
src/main/java/de/juplo/kafka/wordcount/recorder/User.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/wordcount/recorder/RecorderApplicationIT.java
src/test/java/de/juplo/kafka/wordcount/splitter/TestUser.java [new file with mode: 0644]

index 3928f2f..6702a70 100644 (file)
@@ -2,7 +2,6 @@ package de.juplo.kafka.wordcount.recorder;
 
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.serialization.StringSerializer;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
@@ -24,7 +23,7 @@ public class RecorderApplication
 
                Properties props = new Properties();
                props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
-               props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+               props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
                props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
                props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
 
diff --git a/src/main/java/de/juplo/kafka/wordcount/recorder/User.java b/src/main/java/de/juplo/kafka/wordcount/recorder/User.java
new file mode 100644 (file)
index 0000000..1245cfe
--- /dev/null
@@ -0,0 +1,10 @@
+package de.juplo.kafka.wordcount.recorder;
+
+import lombok.Value;
+
+
+@Value(staticConstructor = "of")
+public class User
+{
+  String user;
+}
index 9d73e7e..9a1d516 100644 (file)
@@ -1,6 +1,7 @@
 package de.juplo.kafka.wordcount.recorder;
 
 import de.juplo.kafka.wordcount.splitter.TestRecording;
+import de.juplo.kafka.wordcount.splitter.TestUser;
 import lombok.extern.slf4j.Slf4j;
 import org.junit.jupiter.api.DisplayName;
 import org.junit.jupiter.api.Test;
@@ -36,9 +37,10 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.
 @SpringBootTest(
                properties = {
                                "spring.kafka.consumer.auto-offset-reset=earliest",
+                               "spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
                                "spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
+                               "spring.kafka.consumer.properties.spring.json.key.default.type=de.juplo.kafka.wordcount.splitter.TestUser",
                                "spring.kafka.consumer.properties.spring.json.value.default.type=de.juplo.kafka.wordcount.splitter.TestRecording",
-                               "spring.kafka.consumer.properties.spring.json.trusted.packages=de.juplo.kafka.wordcount.recorder",
                                "logging.level.root=WARN",
                                "logging.level.de.juplo=DEBUG",
                                "juplo.wordcount.recorder.bootstrap-server=${spring.embedded.kafka.brokers}",
@@ -60,14 +62,14 @@ class RecorderApplicationIT
        @DisplayName("Posted messages are accepted and sent to Kafka")
        void userMessagesAreAcceptedAndSentToKafka()
        {
-               MultiValueMap<String, TestRecording> recordings = new LinkedMultiValueMap<>();
+               MultiValueMap<TestUser, TestRecording> recordings = new LinkedMultiValueMap<>();
 
                TestData
                                .getInputMessages()
                                .forEach(kv ->
                                {
                                        sendRedording(kv.key, kv.value);
-                                       recordings.add(kv.key, TestRecording.of(kv.key, kv.value));
+                                       recordings.add(TestUser.of(kv.key), TestRecording.of(kv.key, kv.value));
                                });
 
 
@@ -102,18 +104,18 @@ class RecorderApplicationIT
 
        static class Consumer
        {
-               private final MultiValueMap<String, TestRecording> received = new LinkedMultiValueMap<>();
+               private final MultiValueMap<TestUser, TestRecording> received = new LinkedMultiValueMap<>();
 
                @KafkaListener(groupId = "TEST", topics = TOPIC_OUT)
                public synchronized void receive(
-                               @Header(KafkaHeaders.RECEIVED_KEY) String user,
+                               @Header(KafkaHeaders.RECEIVED_KEY) TestUser user,
                                @Payload TestRecording recording)
                {
                        log.debug("Received message: {}={}", user, recording);
                        received.add(user, recording);
                }
 
-               synchronized List<TestRecording> receivedFor(String user)
+               synchronized List<TestRecording> receivedFor(TestUser user)
                {
                        List<TestRecording> recordings = received.get(user);
                        return recordings == null
diff --git a/src/test/java/de/juplo/kafka/wordcount/splitter/TestUser.java b/src/test/java/de/juplo/kafka/wordcount/splitter/TestUser.java
new file mode 100644 (file)
index 0000000..60c0708
--- /dev/null
@@ -0,0 +1,14 @@
+package de.juplo.kafka.wordcount.splitter;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor(staticName = "of")
+public class TestUser
+{
+  String user;
+}