recorder: 1.2.2 - RocksDB does nor work in Alpine-Linux
[demos/kafka/wordcount] / src / test / java / de / juplo / kafka / wordcount / recorder / RecorderApplicationIT.java
index 3879d38..9a1d516 100644 (file)
@@ -1,6 +1,7 @@
 package de.juplo.kafka.wordcount.recorder;
 
 import de.juplo.kafka.wordcount.splitter.TestRecording;
+import de.juplo.kafka.wordcount.splitter.TestUser;
 import lombok.extern.slf4j.Slf4j;
 import org.junit.jupiter.api.DisplayName;
 import org.junit.jupiter.api.Test;
@@ -22,9 +23,8 @@ import org.springframework.util.MultiValueMap;
 
 import java.time.Duration;
 import java.util.List;
-import java.util.stream.Stream;
 
-import static de.juplo.kafka.wordcount.recorder.ApplicationTests.TOPIC_OUT;
+import static de.juplo.kafka.wordcount.recorder.RecorderApplicationIT.TOPIC_OUT;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.awaitility.Awaitility.await;
 import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.asyncDispatch;
@@ -37,9 +37,10 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.
 @SpringBootTest(
                properties = {
                                "spring.kafka.consumer.auto-offset-reset=earliest",
+                               "spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
                                "spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
+                               "spring.kafka.consumer.properties.spring.json.key.default.type=de.juplo.kafka.wordcount.splitter.TestUser",
                                "spring.kafka.consumer.properties.spring.json.value.default.type=de.juplo.kafka.wordcount.splitter.TestRecording",
-                               "spring.kafka.consumer.properties.spring.json.trusted.packages=de.juplo.kafka.wordcount.recorder",
                                "logging.level.root=WARN",
                                "logging.level.de.juplo=DEBUG",
                                "juplo.wordcount.recorder.bootstrap-server=${spring.embedded.kafka.brokers}",
@@ -47,7 +48,7 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.
 @AutoConfigureMockMvc
 @EmbeddedKafka(topics = { TOPIC_OUT })
 @Slf4j
-class ApplicationTests
+class RecorderApplicationIT
 {
        static final String TOPIC_OUT = "out";
 
@@ -61,18 +62,14 @@ class ApplicationTests
        @DisplayName("Posted messages are accepted and sent to Kafka")
        void userMessagesAreAcceptedAndSentToKafka()
        {
-               MultiValueMap<String, TestRecording> recordings = new LinkedMultiValueMap<>();
-
-               Stream
-                               .of(
-                                               new TestRecording("päter", "Hall° Wält?¢*&%€!"),
-                                               new TestRecording("päter", "Hallo Welt!"),
-                                               new TestRecording("klühs", "Müsch gäb's auch!"),
-                                               new TestRecording("päter", "Boäh, echt! ß mal nä Nümmäh!"))
-                               .forEach(recording ->
+               MultiValueMap<TestUser, TestRecording> recordings = new LinkedMultiValueMap<>();
+
+               TestData
+                               .getInputMessages()
+                               .forEach(kv ->
                                {
-                                       sendRedording(recording.getUser(), recording.getSentence());
-                                       recordings.add(recording.getUser(), recording);
+                                       sendRedording(kv.key, kv.value);
+                                       recordings.add(TestUser.of(kv.key), TestRecording.of(kv.key, kv.value));
                                });
 
 
@@ -107,18 +104,18 @@ class ApplicationTests
 
        static class Consumer
        {
-               private final MultiValueMap<String, TestRecording> received = new LinkedMultiValueMap<>();
+               private final MultiValueMap<TestUser, TestRecording> received = new LinkedMultiValueMap<>();
 
                @KafkaListener(groupId = "TEST", topics = TOPIC_OUT)
                public synchronized void receive(
-                               @Header(KafkaHeaders.RECEIVED_KEY) String user,
+                               @Header(KafkaHeaders.RECEIVED_KEY) TestUser user,
                                @Payload TestRecording recording)
                {
                        log.debug("Received message: {}={}", user, recording);
                        received.add(user, recording);
                }
 
-               synchronized List<TestRecording> receivedFor(String user)
+               synchronized List<TestRecording> receivedFor(TestUser user)
                {
                        List<TestRecording> recordings = received.get(user);
                        return recordings == null