X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fsplitter%2FSplitterApplicationIT.java;h=e945b31e5c48dc5a1ebb85eb9d59891f259d37e0;hb=b95a6dae1b668f87ec14d0ace9b768ca89e338b3;hp=d1bbc0fb794280600901bc60ba70b6e7571c9788;hpb=72872effdc38ff886532c895a1a4ae1a8e6aa95c;p=demos%2Fkafka%2Fwordcount diff --git a/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationIT.java index d1bbc0f..e945b31 100644 --- a/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationIT.java @@ -1,9 +1,11 @@ package de.juplo.kafka.wordcount.splitter; -import de.juplo.kafka.wordcount.counter.TestWord; +import de.juplo.kafka.wordcount.counter.TestOutputUser; +import de.juplo.kafka.wordcount.counter.TestOutputWord; +import de.juplo.kafka.wordcount.recorder.TestInputRecording; +import de.juplo.kafka.wordcount.recorder.TestInputUser; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.streams.KeyValue; -import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @@ -12,6 +14,7 @@ import org.springframework.context.annotation.Bean; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.kafka.support.SendResult; import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.messaging.handler.annotation.Header; import org.springframework.messaging.handler.annotation.Payload; @@ -19,22 +22,22 @@ import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; import java.time.Duration; -import java.util.function.BiConsumer; -import java.util.stream.Stream; -import static de.juplo.kafka.wordcount.splitter.SplitterApplicationIT.*; -import static org.assertj.core.api.Assertions.assertThat; +import static de.juplo.kafka.wordcount.splitter.SplitterApplicationIT.TOPIC_IN; +import static de.juplo.kafka.wordcount.splitter.SplitterApplicationIT.TOPIC_OUT; import static org.awaitility.Awaitility.await; @SpringBootTest( properties = { + "spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer", "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer", "spring.kafka.producer.properties.spring.json.add.type.headers=false", "spring.kafka.consumer.auto-offset-reset=earliest", + "spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer", "spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer", - "spring.kafka.consumer.properties.spring.json.value.default.type=de.juplo.kafka.wordcount.counter.TestWord", - "spring.kafka.consumer.properties.spring.json.trusted.packages=de.juplo.kafka.wordcount.splitter", + "spring.kafka.consumer.properties.spring.json.key.default.type=de.juplo.kafka.wordcount.counter.TestOutputUser", + "spring.kafka.consumer.properties.spring.json.value.default.type=de.juplo.kafka.wordcount.counter.TestOutputWord", "logging.level.root=WARN", "logging.level.de.juplo=DEBUG", "juplo.wordcount.splitter.bootstrap-server=${spring.embedded.kafka.brokers}", @@ -47,113 +50,59 @@ public class SplitterApplicationIT public final static String TOPIC_IN = "in"; public final static String TOPIC_OUT = "out"; - @Autowired - KafkaTemplate kafkaTemplate; @Autowired Consumer consumer; - - @BeforeEach - public void clear() + @BeforeAll + public static void testSendMessage( + @Autowired KafkaTemplate kafkaTemplate) { - consumer.received.clear(); + TestData + .getInputMessages() + .forEach(kv -> + { + try + { + SendResult result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get(); + log.info( + "Sent: {}={}, partition={}, offset={}", + result.getProducerRecord().key(), + result.getProducerRecord().value(), + result.getRecordMetadata().partition(), + result.getRecordMetadata().offset()); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + }); } - static void writeInputData(BiConsumer consumer) - { - Recording recording; - - recording = new Recording(); - recording.setUser("peter"); - recording.setSentence("Hallo Welt!"); - consumer.accept(recording.getUser(), recording); - - recording = new Recording(); - recording.setUser("klaus"); - recording.setSentence("Müsch gäb's auch!"); - consumer.accept(recording.getUser(), recording); - - recording = new Recording(); - recording.setUser("peter"); - recording.setSentence("Boäh, echt! ß mal nä Nümmäh!"); - consumer.accept(recording.getUser(), recording); - } @Test void testSendMessage() throws Exception { - writeInputData((user, recording) -> kafkaTemplate.send(TOPIC_IN, user, recording)); - - await("Expexted converted data") + await("Expected converted data") .atMost(Duration.ofSeconds(5)) .untilAsserted(() -> - assertExpectedMessages(consumer.getReceivedMessages())); + TestData.assertExpectedMessages(consumer.getReceivedMessages())); } - static void assertExpectedMessages(MultiValueMap receivedMessages) - { - MultiValueMap expected = new LinkedMultiValueMap<>(); - expectedMessages.forEach(keyValue -> expected.add(keyValue.key, keyValue.value)); - await("Received expected messages") - .atMost(Duration.ofSeconds(5)) - .untilAsserted(() -> expected.forEach((user, word) -> - assertThat(receivedMessages.get(user)).containsExactlyElementsOf(word))); - } - - static Stream> expectedMessages = Stream.of( - KeyValue.pair( - "peter", - new TestWord("peter", "Hallo")), - KeyValue.pair( - "peter", - new TestWord("peter", "Welt")), - KeyValue.pair( - "klaus", - new TestWord("klaus", "Müsch")), - KeyValue.pair( - "klaus", - new TestWord("klaus", "gäb")), - KeyValue.pair( - "klaus", - new TestWord("klaus", "s")), - KeyValue.pair( - "klaus", - new TestWord("klaus", "auch")), - KeyValue.pair( - "peter", - new TestWord("peter", "Boäh")), - KeyValue.pair( - "peter", - new TestWord("peter", "echt")), - KeyValue.pair( - "peter", - new TestWord("peter", "ß")), - KeyValue.pair( - "peter", - new TestWord("peter", "mal")), - KeyValue.pair( - "peter", - new TestWord("peter", "nä")), - KeyValue.pair( - "peter", - new TestWord("peter", "Nümmäh"))); - - static class Consumer { - private final MultiValueMap received = new LinkedMultiValueMap<>(); + private final MultiValueMap received = new LinkedMultiValueMap<>(); @KafkaListener(groupId = "TEST", topics = TOPIC_OUT) public synchronized void receive( - @Header(KafkaHeaders.RECEIVED_KEY) String key, - @Payload TestWord value) + @Header(KafkaHeaders.RECEIVED_KEY) TestOutputUser key, + @Payload TestOutputWord value) { log.debug("Received message: {}={}", key, value); received.add(key, value); } - synchronized MultiValueMap getReceivedMessages() + synchronized MultiValueMap getReceivedMessages() { return received; }