X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Frecorder%2FRecorderApplicationIT.java;h=9a1d516f1821a622d8005b69725f333b27e3cbb0;hb=9b0331dc42d23c19879377638d5833b5fcdf2281;hp=9d73e7ef0ef78d4c2e992965060713971b8d3f16;hpb=2893aaa541057a9ab6a253b1995307c1f9fe0ed8;p=demos%2Fkafka%2Fwordcount diff --git a/src/test/java/de/juplo/kafka/wordcount/recorder/RecorderApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/recorder/RecorderApplicationIT.java index 9d73e7e..9a1d516 100644 --- a/src/test/java/de/juplo/kafka/wordcount/recorder/RecorderApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/recorder/RecorderApplicationIT.java @@ -1,6 +1,7 @@ package de.juplo.kafka.wordcount.recorder; import de.juplo.kafka.wordcount.splitter.TestRecording; +import de.juplo.kafka.wordcount.splitter.TestUser; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; @@ -36,9 +37,10 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers. @SpringBootTest( properties = { "spring.kafka.consumer.auto-offset-reset=earliest", + "spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer", "spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer", + "spring.kafka.consumer.properties.spring.json.key.default.type=de.juplo.kafka.wordcount.splitter.TestUser", "spring.kafka.consumer.properties.spring.json.value.default.type=de.juplo.kafka.wordcount.splitter.TestRecording", - "spring.kafka.consumer.properties.spring.json.trusted.packages=de.juplo.kafka.wordcount.recorder", "logging.level.root=WARN", "logging.level.de.juplo=DEBUG", "juplo.wordcount.recorder.bootstrap-server=${spring.embedded.kafka.brokers}", @@ -60,14 +62,14 @@ class RecorderApplicationIT @DisplayName("Posted messages are accepted and sent to Kafka") void userMessagesAreAcceptedAndSentToKafka() { - MultiValueMap recordings = new LinkedMultiValueMap<>(); + MultiValueMap recordings = new LinkedMultiValueMap<>(); TestData .getInputMessages() .forEach(kv -> { sendRedording(kv.key, kv.value); - recordings.add(kv.key, TestRecording.of(kv.key, kv.value)); + recordings.add(TestUser.of(kv.key), TestRecording.of(kv.key, kv.value)); }); @@ -102,18 +104,18 @@ class RecorderApplicationIT static class Consumer { - private final MultiValueMap received = new LinkedMultiValueMap<>(); + private final MultiValueMap received = new LinkedMultiValueMap<>(); @KafkaListener(groupId = "TEST", topics = TOPIC_OUT) public synchronized void receive( - @Header(KafkaHeaders.RECEIVED_KEY) String user, + @Header(KafkaHeaders.RECEIVED_KEY) TestUser user, @Payload TestRecording recording) { log.debug("Received message: {}={}", user, recording); received.add(user, recording); } - synchronized List receivedFor(String user) + synchronized List receivedFor(TestUser user) { List recordings = received.get(user); return recordings == null