X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fsplitter%2FSplitterApplicationIT.java;fp=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fsplitter%2FSplitterApplicationIT.java;h=e945b31e5c48dc5a1ebb85eb9d59891f259d37e0;hb=b95a6dae1b668f87ec14d0ace9b768ca89e338b3;hp=891a43578a7bbb07e00a9a7916b7fedd139881e0;hpb=47ed803476f547b79439b510d409c81d7d85db53;p=demos%2Fkafka%2Fwordcount diff --git a/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationIT.java index 891a435..e945b31 100644 --- a/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationIT.java @@ -1,7 +1,9 @@ package de.juplo.kafka.wordcount.splitter; +import de.juplo.kafka.wordcount.counter.TestOutputUser; import de.juplo.kafka.wordcount.counter.TestOutputWord; import de.juplo.kafka.wordcount.recorder.TestInputRecording; +import de.juplo.kafka.wordcount.recorder.TestInputUser; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -28,12 +30,14 @@ import static org.awaitility.Awaitility.await; @SpringBootTest( properties = { + "spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer", "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer", "spring.kafka.producer.properties.spring.json.add.type.headers=false", "spring.kafka.consumer.auto-offset-reset=earliest", + "spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer", "spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer", - "spring.kafka.consumer.properties.spring.json.value.default.type=de.juplo.kafka.wordcount.counter.TestWord", - "spring.kafka.consumer.properties.spring.json.trusted.packages=de.juplo.kafka.wordcount.splitter", + "spring.kafka.consumer.properties.spring.json.key.default.type=de.juplo.kafka.wordcount.counter.TestOutputUser", + "spring.kafka.consumer.properties.spring.json.value.default.type=de.juplo.kafka.wordcount.counter.TestOutputWord", "logging.level.root=WARN", "logging.level.de.juplo=DEBUG", "juplo.wordcount.splitter.bootstrap-server=${spring.embedded.kafka.brokers}", @@ -51,7 +55,7 @@ public class SplitterApplicationIT @BeforeAll public static void testSendMessage( - @Autowired KafkaTemplate kafkaTemplate) + @Autowired KafkaTemplate kafkaTemplate) { TestData .getInputMessages() @@ -59,7 +63,7 @@ public class SplitterApplicationIT { try { - SendResult result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get(); + SendResult result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get(); log.info( "Sent: {}={}, partition={}, offset={}", result.getProducerRecord().key(), @@ -78,7 +82,7 @@ public class SplitterApplicationIT @Test void testSendMessage() throws Exception { - await("Expexted converted data") + await("Expected converted data") .atMost(Duration.ofSeconds(5)) .untilAsserted(() -> TestData.assertExpectedMessages(consumer.getReceivedMessages())); @@ -87,18 +91,18 @@ public class SplitterApplicationIT static class Consumer { - private final MultiValueMap received = new LinkedMultiValueMap<>(); + private final MultiValueMap received = new LinkedMultiValueMap<>(); @KafkaListener(groupId = "TEST", topics = TOPIC_OUT) public synchronized void receive( - @Header(KafkaHeaders.RECEIVED_KEY) String key, + @Header(KafkaHeaders.RECEIVED_KEY) TestOutputUser key, @Payload TestOutputWord value) { log.debug("Received message: {}={}", key, value); received.add(key, value); } - synchronized MultiValueMap getReceivedMessages() + synchronized MultiValueMap getReceivedMessages() { return received; }