X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fsplitter%2FSplitterApplicationIT.java;h=891a43578a7bbb07e00a9a7916b7fedd139881e0;hb=47ed803476f547b79439b510d409c81d7d85db53;hp=a702e1d1be7d711b425c60020b7f24af412df818;hpb=cfed6b631771687b645375394c422ec0c02720f5;p=demos%2Fkafka%2Fwordcount diff --git a/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationIT.java index a702e1d..891a435 100644 --- a/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationIT.java @@ -1,9 +1,9 @@ package de.juplo.kafka.wordcount.splitter; -import de.juplo.kafka.wordcount.counter.TestWord; -import de.juplo.kafka.wordcount.recorder.TestRecording; +import de.juplo.kafka.wordcount.counter.TestOutputWord; +import de.juplo.kafka.wordcount.recorder.TestInputRecording; import lombok.extern.slf4j.Slf4j; -import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @@ -12,6 +12,7 @@ import org.springframework.context.annotation.Bean; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.kafka.support.SendResult; import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.messaging.handler.annotation.Header; import org.springframework.messaging.handler.annotation.Payload; @@ -20,7 +21,8 @@ import org.springframework.util.MultiValueMap; import java.time.Duration; -import static de.juplo.kafka.wordcount.splitter.SplitterApplicationIT.*; +import static de.juplo.kafka.wordcount.splitter.SplitterApplicationIT.TOPIC_IN; +import static de.juplo.kafka.wordcount.splitter.SplitterApplicationIT.TOPIC_OUT; import static org.awaitility.Awaitility.await; @@ -44,23 +46,38 @@ public class SplitterApplicationIT public final static String TOPIC_IN = "in"; public final static String TOPIC_OUT = "out"; - @Autowired - KafkaTemplate kafkaTemplate; @Autowired Consumer consumer; - - @BeforeEach - public void clear() + @BeforeAll + public static void testSendMessage( + @Autowired KafkaTemplate kafkaTemplate) { - consumer.received.clear(); + TestData + .getInputMessages() + .forEach(kv -> + { + try + { + SendResult result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get(); + log.info( + "Sent: {}={}, partition={}, offset={}", + result.getProducerRecord().key(), + result.getProducerRecord().value(), + result.getRecordMetadata().partition(), + result.getRecordMetadata().offset()); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + }); } + @Test void testSendMessage() throws Exception { - TestData.writeInputData((user, recording) -> kafkaTemplate.send(TOPIC_IN, user, recording)); - await("Expexted converted data") .atMost(Duration.ofSeconds(5)) .untilAsserted(() -> @@ -70,18 +87,18 @@ public class SplitterApplicationIT static class Consumer { - private final MultiValueMap received = new LinkedMultiValueMap<>(); + private final MultiValueMap received = new LinkedMultiValueMap<>(); @KafkaListener(groupId = "TEST", topics = TOPIC_OUT) public synchronized void receive( @Header(KafkaHeaders.RECEIVED_KEY) String key, - @Payload TestWord value) + @Payload TestOutputWord value) { log.debug("Received message: {}={}", key, value); received.add(key, value); } - synchronized MultiValueMap getReceivedMessages() + synchronized MultiValueMap getReceivedMessages() { return received; }