X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fsplitter%2FTestData.java;h=a3f757566813fdf9ff77d30040bd513e8d6185bc;hb=53b03696e9e3890d9aeca795482cf66252449fad;hp=d1bbc0fb794280600901bc60ba70b6e7571c9788;hpb=d7fa29b35fef537b65e9110f708e781925ebff9f;p=demos%2Fkafka%2Fwordcount diff --git a/src/test/java/de/juplo/kafka/wordcount/splitter/TestData.java b/src/test/java/de/juplo/kafka/wordcount/splitter/TestData.java index d1bbc0f..a3f7575 100644 --- a/src/test/java/de/juplo/kafka/wordcount/splitter/TestData.java +++ b/src/test/java/de/juplo/kafka/wordcount/splitter/TestData.java @@ -1,172 +1,96 @@ package de.juplo.kafka.wordcount.splitter; import de.juplo.kafka.wordcount.counter.TestWord; -import lombok.extern.slf4j.Slf4j; +import de.juplo.kafka.wordcount.recorder.TestRecording; import org.apache.kafka.streams.KeyValue; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.boot.test.context.TestConfiguration; -import org.springframework.context.annotation.Bean; -import org.springframework.kafka.annotation.KafkaListener; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.kafka.support.KafkaHeaders; -import org.springframework.kafka.test.context.EmbeddedKafka; -import org.springframework.messaging.handler.annotation.Header; -import org.springframework.messaging.handler.annotation.Payload; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; import java.time.Duration; -import java.util.function.BiConsumer; import java.util.stream.Stream; -import static de.juplo.kafka.wordcount.splitter.SplitterApplicationIT.*; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; -@SpringBootTest( - properties = { - "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer", - "spring.kafka.producer.properties.spring.json.add.type.headers=false", - "spring.kafka.consumer.auto-offset-reset=earliest", - "spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer", - "spring.kafka.consumer.properties.spring.json.value.default.type=de.juplo.kafka.wordcount.counter.TestWord", - "spring.kafka.consumer.properties.spring.json.trusted.packages=de.juplo.kafka.wordcount.splitter", - "logging.level.root=WARN", - "logging.level.de.juplo=DEBUG", - "juplo.wordcount.splitter.bootstrap-server=${spring.embedded.kafka.brokers}", - "juplo.wordcount.splitter.input-topic=" + TOPIC_IN, - "juplo.wordcount.splitter.output-topic=" + TOPIC_OUT }) -@EmbeddedKafka(topics = { TOPIC_IN, TOPIC_OUT }) -@Slf4j -public class SplitterApplicationIT +public class TestData { - public final static String TOPIC_IN = "in"; - public final static String TOPIC_OUT = "out"; + static final String PETER = "peter"; + static final String KLAUS = "klaus"; - @Autowired - KafkaTemplate kafkaTemplate; - @Autowired - Consumer consumer; - - @BeforeEach - public void clear() + static final Stream> getInputMessages() { - consumer.received.clear(); - } - - static void writeInputData(BiConsumer consumer) - { - Recording recording; - - recording = new Recording(); - recording.setUser("peter"); - recording.setSentence("Hallo Welt!"); - consumer.accept(recording.getUser(), recording); - - recording = new Recording(); - recording.setUser("klaus"); - recording.setSentence("Müsch gäb's auch!"); - consumer.accept(recording.getUser(), recording); - - recording = new Recording(); - recording.setUser("peter"); - recording.setSentence("Boäh, echt! ß mal nä Nümmäh!"); - consumer.accept(recording.getUser(), recording); + return Stream.of(INPUT_MESSAGES); } - @Test - void testSendMessage() throws Exception + private static final KeyValue[] INPUT_MESSAGES = new KeyValue[] { - writeInputData((user, recording) -> kafkaTemplate.send(TOPIC_IN, user, recording)); - - await("Expexted converted data") - .atMost(Duration.ofSeconds(5)) - .untilAsserted(() -> - assertExpectedMessages(consumer.getReceivedMessages())); - } - + new KeyValue<>( + PETER, + TestRecording.of(PETER, "Hallo Welt!")), + new KeyValue<>( + KLAUS, + TestRecording.of(KLAUS, "Müsch gäb's auch!")), + new KeyValue<>( + PETER, + TestRecording.of(PETER, "Boäh, echt! ß mal nä Nümmäh!")), + }; static void assertExpectedMessages(MultiValueMap receivedMessages) { - MultiValueMap expected = new LinkedMultiValueMap<>(); - expectedMessages.forEach(keyValue -> expected.add(keyValue.key, keyValue.value)); await("Received expected messages") .atMost(Duration.ofSeconds(5)) - .untilAsserted(() -> expected.forEach((user, word) -> + .untilAsserted(() -> expectedMessages().forEach((user, word) -> assertThat(receivedMessages.get(user)).containsExactlyElementsOf(word))); } - static Stream> expectedMessages = Stream.of( + private static final KeyValue[] EXPECTED_MESSAGES = new KeyValue[] + { KeyValue.pair( "peter", - new TestWord("peter", "Hallo")), + TestWord.of("peter", "Hallo")), KeyValue.pair( "peter", - new TestWord("peter", "Welt")), + TestWord.of("peter", "Welt")), KeyValue.pair( "klaus", - new TestWord("klaus", "Müsch")), + TestWord.of("klaus", "Müsch")), KeyValue.pair( "klaus", - new TestWord("klaus", "gäb")), + TestWord.of("klaus", "gäb")), KeyValue.pair( "klaus", - new TestWord("klaus", "s")), + TestWord.of("klaus", "s")), KeyValue.pair( "klaus", - new TestWord("klaus", "auch")), + TestWord.of("klaus", "auch")), KeyValue.pair( "peter", - new TestWord("peter", "Boäh")), + TestWord.of("peter", "Boäh")), KeyValue.pair( "peter", - new TestWord("peter", "echt")), + TestWord.of("peter", "echt")), KeyValue.pair( "peter", - new TestWord("peter", "ß")), + TestWord.of("peter", "ß")), KeyValue.pair( "peter", - new TestWord("peter", "mal")), + TestWord.of("peter", "mal")), KeyValue.pair( "peter", - new TestWord("peter", "nä")), + TestWord.of("peter", "nä")), KeyValue.pair( "peter", - new TestWord("peter", "Nümmäh"))); - - - static class Consumer - { - private final MultiValueMap received = new LinkedMultiValueMap<>(); - - @KafkaListener(groupId = "TEST", topics = TOPIC_OUT) - public synchronized void receive( - @Header(KafkaHeaders.RECEIVED_KEY) String key, - @Payload TestWord value) - { - log.debug("Received message: {}={}", key, value); - received.add(key, value); - } - - synchronized MultiValueMap getReceivedMessages() - { - return received; - } - } - + TestWord.of("peter", "Nümmäh")), + }; - @TestConfiguration - static class Configuration + static MultiValueMap expectedMessages() { - @Bean - Consumer consumer() - { - return new Consumer(); - } + MultiValueMap expectedMessages = new LinkedMultiValueMap<>(); + Stream + .of(EXPECTED_MESSAGES) + .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value)); + return expectedMessages; } }