From 9b0331dc42d23c19879377638d5833b5fcdf2281 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 2 Jun 2024 13:01:08 +0200 Subject: [PATCH] recorder: 1.2.0 - The key is also serialized as JSON (new type `User`) --- .../wordcount/recorder/RecorderApplication.java | 3 +-- .../de/juplo/kafka/wordcount/recorder/User.java | 10 ++++++++++ .../wordcount/recorder/RecorderApplicationIT.java | 14 ++++++++------ .../juplo/kafka/wordcount/splitter/TestUser.java | 14 ++++++++++++++ 4 files changed, 33 insertions(+), 8 deletions(-) create mode 100644 src/main/java/de/juplo/kafka/wordcount/recorder/User.java create mode 100644 src/test/java/de/juplo/kafka/wordcount/splitter/TestUser.java diff --git a/src/main/java/de/juplo/kafka/wordcount/recorder/RecorderApplication.java b/src/main/java/de/juplo/kafka/wordcount/recorder/RecorderApplication.java index 3928f2f..6702a70 100644 --- a/src/main/java/de/juplo/kafka/wordcount/recorder/RecorderApplication.java +++ b/src/main/java/de/juplo/kafka/wordcount/recorder/RecorderApplication.java @@ -2,7 +2,6 @@ package de.juplo.kafka.wordcount.recorder; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.context.properties.EnableConfigurationProperties; @@ -24,7 +23,7 @@ public class RecorderApplication Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer()); - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false); diff --git a/src/main/java/de/juplo/kafka/wordcount/recorder/User.java b/src/main/java/de/juplo/kafka/wordcount/recorder/User.java new file mode 100644 index 0000000..1245cfe --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/recorder/User.java @@ -0,0 +1,10 @@ +package de.juplo.kafka.wordcount.recorder; + +import lombok.Value; + + +@Value(staticConstructor = "of") +public class User +{ + String user; +} diff --git a/src/test/java/de/juplo/kafka/wordcount/recorder/RecorderApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/recorder/RecorderApplicationIT.java index 9d73e7e..9a1d516 100644 --- a/src/test/java/de/juplo/kafka/wordcount/recorder/RecorderApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/recorder/RecorderApplicationIT.java @@ -1,6 +1,7 @@ package de.juplo.kafka.wordcount.recorder; import de.juplo.kafka.wordcount.splitter.TestRecording; +import de.juplo.kafka.wordcount.splitter.TestUser; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; @@ -36,9 +37,10 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers. @SpringBootTest( properties = { "spring.kafka.consumer.auto-offset-reset=earliest", + "spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer", "spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer", + "spring.kafka.consumer.properties.spring.json.key.default.type=de.juplo.kafka.wordcount.splitter.TestUser", "spring.kafka.consumer.properties.spring.json.value.default.type=de.juplo.kafka.wordcount.splitter.TestRecording", - "spring.kafka.consumer.properties.spring.json.trusted.packages=de.juplo.kafka.wordcount.recorder", "logging.level.root=WARN", "logging.level.de.juplo=DEBUG", "juplo.wordcount.recorder.bootstrap-server=${spring.embedded.kafka.brokers}", @@ -60,14 +62,14 @@ class RecorderApplicationIT @DisplayName("Posted messages are accepted and sent to Kafka") void userMessagesAreAcceptedAndSentToKafka() { - MultiValueMap recordings = new LinkedMultiValueMap<>(); + MultiValueMap recordings = new LinkedMultiValueMap<>(); TestData .getInputMessages() .forEach(kv -> { sendRedording(kv.key, kv.value); - recordings.add(kv.key, TestRecording.of(kv.key, kv.value)); + recordings.add(TestUser.of(kv.key), TestRecording.of(kv.key, kv.value)); }); @@ -102,18 +104,18 @@ class RecorderApplicationIT static class Consumer { - private final MultiValueMap received = new LinkedMultiValueMap<>(); + private final MultiValueMap received = new LinkedMultiValueMap<>(); @KafkaListener(groupId = "TEST", topics = TOPIC_OUT) public synchronized void receive( - @Header(KafkaHeaders.RECEIVED_KEY) String user, + @Header(KafkaHeaders.RECEIVED_KEY) TestUser user, @Payload TestRecording recording) { log.debug("Received message: {}={}", user, recording); received.add(user, recording); } - synchronized List receivedFor(String user) + synchronized List receivedFor(TestUser user) { List recordings = received.get(user); return recordings == null diff --git a/src/test/java/de/juplo/kafka/wordcount/splitter/TestUser.java b/src/test/java/de/juplo/kafka/wordcount/splitter/TestUser.java new file mode 100644 index 0000000..60c0708 --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/splitter/TestUser.java @@ -0,0 +1,14 @@ +package de.juplo.kafka.wordcount.splitter; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + + +@Data +@NoArgsConstructor +@AllArgsConstructor(staticName = "of") +public class TestUser +{ + String user; +} -- 2.20.1