X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fsplitter%2FSplitterApplicationIT.java;fp=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fsplitter%2FSplitterApplicationIT.java;h=d1bbc0fb794280600901bc60ba70b6e7571c9788;hb=72872effdc38ff886532c895a1a4ae1a8e6aa95c;hp=92dfd1076ee068763826f56d17e2622cba57ac13;hpb=a41c63a03ef29ddc081c58f2b4e73fa27fb2316b;p=demos%2Fkafka%2Fwordcount diff --git a/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationIT.java index 92dfd10..d1bbc0f 100644 --- a/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationIT.java @@ -1,8 +1,8 @@ package de.juplo.kafka.wordcount.splitter; -import com.fasterxml.jackson.databind.ObjectMapper; +import de.juplo.kafka.wordcount.counter.TestWord; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.streams.KeyValue; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; @@ -11,35 +11,44 @@ import org.springframework.boot.test.context.TestConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.KafkaHeaders; import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.messaging.handler.annotation.Payload; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; import java.time.Duration; +import java.util.function.BiConsumer; +import java.util.stream.Stream; import static de.juplo.kafka.wordcount.splitter.SplitterApplicationIT.*; import static org.assertj.core.api.Assertions.assertThat; -import static org.awaitility.Awaitility.*; +import static org.awaitility.Awaitility.await; @SpringBootTest( properties = { - "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", + "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer", + "spring.kafka.producer.properties.spring.json.add.type.headers=false", + "spring.kafka.consumer.auto-offset-reset=earliest", + "spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer", + "spring.kafka.consumer.properties.spring.json.value.default.type=de.juplo.kafka.wordcount.counter.TestWord", + "spring.kafka.consumer.properties.spring.json.trusted.packages=de.juplo.kafka.wordcount.splitter", + "logging.level.root=WARN", + "logging.level.de.juplo=DEBUG", "juplo.wordcount.splitter.bootstrap-server=${spring.embedded.kafka.brokers}", "juplo.wordcount.splitter.input-topic=" + TOPIC_IN, "juplo.wordcount.splitter.output-topic=" + TOPIC_OUT }) -@EmbeddedKafka(topics = { TOPIC_IN, TOPIC_OUT }, partitions = PARTITIONS) +@EmbeddedKafka(topics = { TOPIC_IN, TOPIC_OUT }) @Slf4j public class SplitterApplicationIT { public final static String TOPIC_IN = "in"; public final static String TOPIC_OUT = "out"; - static final int PARTITIONS = 2; @Autowired - KafkaTemplate kafkaTemplate; - @Autowired - ObjectMapper mapper; + KafkaTemplate kafkaTemplate; @Autowired Consumer consumer; @@ -50,58 +59,107 @@ public class SplitterApplicationIT consumer.received.clear(); } - - @Test - void testSendMessage() throws Exception + static void writeInputData(BiConsumer consumer) { - Recording recording = new Recording(); + Recording recording; + + recording = new Recording(); recording.setUser("peter"); recording.setSentence("Hallo Welt!"); - kafkaTemplate.send(TOPIC_IN, recording.getUser(), mapper.writeValueAsString(recording)); + consumer.accept(recording.getUser(), recording); + + recording = new Recording(); recording.setUser("klaus"); recording.setSentence("Müsch gäb's auch!"); - kafkaTemplate.send(TOPIC_IN, recording.getUser(), mapper.writeValueAsString(recording)); + consumer.accept(recording.getUser(), recording); + + recording = new Recording(); recording.setUser("peter"); recording.setSentence("Boäh, echt! ß mal nä Nümmäh!"); - kafkaTemplate.send(TOPIC_IN, recording.getUser(), mapper.writeValueAsString(recording)); - - String peter1 = mapper.writeValueAsString(Word.of("peter", "Hallo")); - String peter2 = mapper.writeValueAsString(Word.of("peter", "Welt")); - String peter3 = mapper.writeValueAsString(Word.of("peter", "Boäh")); - String peter4 = mapper.writeValueAsString(Word.of("peter", "echt")); - String peter5 = mapper.writeValueAsString(Word.of("peter", "ß")); - String peter6 = mapper.writeValueAsString(Word.of("peter", "mal")); - String peter7 = mapper.writeValueAsString(Word.of("peter", "nä")); - String peter8 = mapper.writeValueAsString(Word.of("peter", "Nümmäh")); - - String klaus1 = mapper.writeValueAsString(Word.of("klaus","Müsch")); - String klaus2 = mapper.writeValueAsString(Word.of("klaus","gäb")); - String klaus3 = mapper.writeValueAsString(Word.of("klaus","s")); - String klaus4 = mapper.writeValueAsString(Word.of("klaus","auch")); + consumer.accept(recording.getUser(), recording); + } + + @Test + void testSendMessage() throws Exception + { + writeInputData((user, recording) -> kafkaTemplate.send(TOPIC_IN, user, recording)); await("Expexted converted data") .atMost(Duration.ofSeconds(5)) .untilAsserted(() -> - { - assertThat(consumer.received).hasSize(2); - assertThat(consumer.received.get("klaus")).containsExactly(klaus1, klaus2, klaus3, klaus4); - assertThat(consumer.received.get("peter")).containsExactly(peter1, peter2, peter3, peter4, peter5, peter6, peter7, peter8); - }); + assertExpectedMessages(consumer.getReceivedMessages())); + } + + + static void assertExpectedMessages(MultiValueMap receivedMessages) + { + MultiValueMap expected = new LinkedMultiValueMap<>(); + expectedMessages.forEach(keyValue -> expected.add(keyValue.key, keyValue.value)); + await("Received expected messages") + .atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> expected.forEach((user, word) -> + assertThat(receivedMessages.get(user)).containsExactlyElementsOf(word))); } + static Stream> expectedMessages = Stream.of( + KeyValue.pair( + "peter", + new TestWord("peter", "Hallo")), + KeyValue.pair( + "peter", + new TestWord("peter", "Welt")), + KeyValue.pair( + "klaus", + new TestWord("klaus", "Müsch")), + KeyValue.pair( + "klaus", + new TestWord("klaus", "gäb")), + KeyValue.pair( + "klaus", + new TestWord("klaus", "s")), + KeyValue.pair( + "klaus", + new TestWord("klaus", "auch")), + KeyValue.pair( + "peter", + new TestWord("peter", "Boäh")), + KeyValue.pair( + "peter", + new TestWord("peter", "echt")), + KeyValue.pair( + "peter", + new TestWord("peter", "ß")), + KeyValue.pair( + "peter", + new TestWord("peter", "mal")), + KeyValue.pair( + "peter", + new TestWord("peter", "nä")), + KeyValue.pair( + "peter", + new TestWord("peter", "Nümmäh"))); + static class Consumer { - final MultiValueMap received = new LinkedMultiValueMap<>(); + private final MultiValueMap received = new LinkedMultiValueMap<>(); @KafkaListener(groupId = "TEST", topics = TOPIC_OUT) - public void receive(ConsumerRecord record) + public synchronized void receive( + @Header(KafkaHeaders.RECEIVED_KEY) String key, + @Payload TestWord value) { - log.debug("Received message: {}", record); - received.add(record.key(), record.value()); + log.debug("Received message: {}={}", key, value); + received.add(key, value); + } + + synchronized MultiValueMap getReceivedMessages() + { + return received; } } + @TestConfiguration static class Configuration {