recorder: 1.2.0 - The key is also serialized as JSON (new type `User`)
[demos/kafka/wordcount] / src / test / java / de / juplo / kafka / wordcount / recorder / RecorderApplicationIT.java
index 9d73e7e..9a1d516 100644 (file)
@@ -1,6 +1,7 @@
 package de.juplo.kafka.wordcount.recorder;
 
 import de.juplo.kafka.wordcount.splitter.TestRecording;
+import de.juplo.kafka.wordcount.splitter.TestUser;
 import lombok.extern.slf4j.Slf4j;
 import org.junit.jupiter.api.DisplayName;
 import org.junit.jupiter.api.Test;
@@ -36,9 +37,10 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.
 @SpringBootTest(
                properties = {
                                "spring.kafka.consumer.auto-offset-reset=earliest",
+                               "spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
                                "spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
+                               "spring.kafka.consumer.properties.spring.json.key.default.type=de.juplo.kafka.wordcount.splitter.TestUser",
                                "spring.kafka.consumer.properties.spring.json.value.default.type=de.juplo.kafka.wordcount.splitter.TestRecording",
-                               "spring.kafka.consumer.properties.spring.json.trusted.packages=de.juplo.kafka.wordcount.recorder",
                                "logging.level.root=WARN",
                                "logging.level.de.juplo=DEBUG",
                                "juplo.wordcount.recorder.bootstrap-server=${spring.embedded.kafka.brokers}",
@@ -60,14 +62,14 @@ class RecorderApplicationIT
        @DisplayName("Posted messages are accepted and sent to Kafka")
        void userMessagesAreAcceptedAndSentToKafka()
        {
-               MultiValueMap<String, TestRecording> recordings = new LinkedMultiValueMap<>();
+               MultiValueMap<TestUser, TestRecording> recordings = new LinkedMultiValueMap<>();
 
                TestData
                                .getInputMessages()
                                .forEach(kv ->
                                {
                                        sendRedording(kv.key, kv.value);
-                                       recordings.add(kv.key, TestRecording.of(kv.key, kv.value));
+                                       recordings.add(TestUser.of(kv.key), TestRecording.of(kv.key, kv.value));
                                });
 
 
@@ -102,18 +104,18 @@ class RecorderApplicationIT
 
        static class Consumer
        {
-               private final MultiValueMap<String, TestRecording> received = new LinkedMultiValueMap<>();
+               private final MultiValueMap<TestUser, TestRecording> received = new LinkedMultiValueMap<>();
 
                @KafkaListener(groupId = "TEST", topics = TOPIC_OUT)
                public synchronized void receive(
-                               @Header(KafkaHeaders.RECEIVED_KEY) String user,
+                               @Header(KafkaHeaders.RECEIVED_KEY) TestUser user,
                                @Payload TestRecording recording)
                {
                        log.debug("Received message: {}={}", user, recording);
                        received.add(user, recording);
                }
 
-               synchronized List<TestRecording> receivedFor(String user)
+               synchronized List<TestRecording> receivedFor(TestUser user)
                {
                        List<TestRecording> recordings = received.get(user);
                        return recordings == null