X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fsplitter%2FSplitterApplicationIT.java;h=891a43578a7bbb07e00a9a7916b7fedd139881e0;hb=47ed803476f547b79439b510d409c81d7d85db53;hp=743c06ee17497f63c34d64fea09ffbe5e3c8625d;hpb=5283e1688c50764d828f83f7d58d4cf1c7e56cf9;p=demos%2Fkafka%2Fwordcount diff --git a/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationIT.java index 743c06e..891a435 100644 --- a/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationIT.java @@ -1,7 +1,7 @@ package de.juplo.kafka.wordcount.splitter; -import de.juplo.kafka.wordcount.counter.TestWord; -import de.juplo.kafka.wordcount.recorder.TestRecording; +import de.juplo.kafka.wordcount.counter.TestOutputWord; +import de.juplo.kafka.wordcount.recorder.TestInputRecording; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -20,7 +20,6 @@ import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; import java.time.Duration; -import java.util.stream.Stream; import static de.juplo.kafka.wordcount.splitter.SplitterApplicationIT.TOPIC_IN; import static de.juplo.kafka.wordcount.splitter.SplitterApplicationIT.TOPIC_OUT; @@ -52,15 +51,15 @@ public class SplitterApplicationIT @BeforeAll public static void testSendMessage( - @Autowired KafkaTemplate kafkaTemplate) + @Autowired KafkaTemplate kafkaTemplate) { - Stream - .of(TestData.INPUT_MESSAGES) + TestData + .getInputMessages() .forEach(kv -> { try { - SendResult result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get(); + SendResult result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get(); log.info( "Sent: {}={}, partition={}, offset={}", result.getProducerRecord().key(), @@ -88,18 +87,18 @@ public class SplitterApplicationIT static class Consumer { - private final MultiValueMap received = new LinkedMultiValueMap<>(); + private final MultiValueMap received = new LinkedMultiValueMap<>(); @KafkaListener(groupId = "TEST", topics = TOPIC_OUT) public synchronized void receive( @Header(KafkaHeaders.RECEIVED_KEY) String key, - @Payload TestWord value) + @Payload TestOutputWord value) { log.debug("Received message: {}={}", key, value); received.add(key, value); } - synchronized MultiValueMap getReceivedMessages() + synchronized MultiValueMap getReceivedMessages() { return received; }