From 5dade68933e284e71667e0bea4ac578d2ec3f1d6 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 26 May 2024 11:49:57 +0200 Subject: [PATCH 01/16] splitter: 1.2.0 - Renamed test into `SplitterApplicationIT` -- MOVE --- .../{SplitterApplicationTests.java => SplitterApplicationIT.java} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename src/test/java/de/juplo/kafka/wordcount/splitter/{SplitterApplicationTests.java => SplitterApplicationIT.java} (100%) diff --git a/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationTests.java b/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationIT.java similarity index 100% rename from src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationTests.java rename to src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationIT.java -- 2.20.1 From a41c63a03ef29ddc081c58f2b4e73fa27fb2316b Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 26 May 2024 11:50:01 +0200 Subject: [PATCH 02/16] splitter: 1.2.0 - Renamed test into `SplitterApplicationIT` -- ALIGN --- pom.xml | 5 ++++- .../kafka/wordcount/splitter/SplitterApplicationIT.java | 4 ++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/pom.xml b/pom.xml index 51f085a..5b9505c 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ de.juplo.kafka.wordcount splitter - 1.1.6 + 1.2.0 Wordcount-Splitter Stream-processor of the multi-user wordcount-example, that splits the sentences up into single words @@ -75,6 +75,9 @@ + + maven-failsafe-plugin + org.springframework.boot spring-boot-maven-plugin diff --git a/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationIT.java index 3fa5851..92dfd10 100644 --- a/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationIT.java @@ -17,7 +17,7 @@ import org.springframework.util.MultiValueMap; import java.time.Duration; -import static de.juplo.kafka.wordcount.splitter.SplitterApplicationTests.*; +import static de.juplo.kafka.wordcount.splitter.SplitterApplicationIT.*; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.*; @@ -30,7 +30,7 @@ import static org.awaitility.Awaitility.*; "juplo.wordcount.splitter.output-topic=" + TOPIC_OUT }) @EmbeddedKafka(topics = { TOPIC_IN, TOPIC_OUT }, partitions = PARTITIONS) @Slf4j -public class SplitterApplicationTests +public class SplitterApplicationIT { public final static String TOPIC_IN = "in"; public final static String TOPIC_OUT = "out"; -- 2.20.1 From 72872effdc38ff886532c895a1a4ae1a8e6aa95c Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 26 May 2024 12:57:51 +0200 Subject: [PATCH 03/16] splitter: 1.2.0 - Refined `SplitterApplicationIT` --- .../kafka/wordcount/counter/TestWord.java | 15 ++ .../splitter/SplitterApplicationIT.java | 136 +++++++++++++----- 2 files changed, 112 insertions(+), 39 deletions(-) create mode 100644 src/test/java/de/juplo/kafka/wordcount/counter/TestWord.java diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/TestWord.java b/src/test/java/de/juplo/kafka/wordcount/counter/TestWord.java new file mode 100644 index 0000000..f2c2447 --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/counter/TestWord.java @@ -0,0 +1,15 @@ +package de.juplo.kafka.wordcount.counter; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + + +@Data +@NoArgsConstructor +@AllArgsConstructor +public class TestWord +{ + String user; + String word; +} diff --git a/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationIT.java index 92dfd10..d1bbc0f 100644 --- a/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationIT.java @@ -1,8 +1,8 @@ package de.juplo.kafka.wordcount.splitter; -import com.fasterxml.jackson.databind.ObjectMapper; +import de.juplo.kafka.wordcount.counter.TestWord; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.streams.KeyValue; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; @@ -11,35 +11,44 @@ import org.springframework.boot.test.context.TestConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.KafkaHeaders; import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.messaging.handler.annotation.Payload; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; import java.time.Duration; +import java.util.function.BiConsumer; +import java.util.stream.Stream; import static de.juplo.kafka.wordcount.splitter.SplitterApplicationIT.*; import static org.assertj.core.api.Assertions.assertThat; -import static org.awaitility.Awaitility.*; +import static org.awaitility.Awaitility.await; @SpringBootTest( properties = { - "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", + "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer", + "spring.kafka.producer.properties.spring.json.add.type.headers=false", + "spring.kafka.consumer.auto-offset-reset=earliest", + "spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer", + "spring.kafka.consumer.properties.spring.json.value.default.type=de.juplo.kafka.wordcount.counter.TestWord", + "spring.kafka.consumer.properties.spring.json.trusted.packages=de.juplo.kafka.wordcount.splitter", + "logging.level.root=WARN", + "logging.level.de.juplo=DEBUG", "juplo.wordcount.splitter.bootstrap-server=${spring.embedded.kafka.brokers}", "juplo.wordcount.splitter.input-topic=" + TOPIC_IN, "juplo.wordcount.splitter.output-topic=" + TOPIC_OUT }) -@EmbeddedKafka(topics = { TOPIC_IN, TOPIC_OUT }, partitions = PARTITIONS) +@EmbeddedKafka(topics = { TOPIC_IN, TOPIC_OUT }) @Slf4j public class SplitterApplicationIT { public final static String TOPIC_IN = "in"; public final static String TOPIC_OUT = "out"; - static final int PARTITIONS = 2; @Autowired - KafkaTemplate kafkaTemplate; - @Autowired - ObjectMapper mapper; + KafkaTemplate kafkaTemplate; @Autowired Consumer consumer; @@ -50,58 +59,107 @@ public class SplitterApplicationIT consumer.received.clear(); } - - @Test - void testSendMessage() throws Exception + static void writeInputData(BiConsumer consumer) { - Recording recording = new Recording(); + Recording recording; + + recording = new Recording(); recording.setUser("peter"); recording.setSentence("Hallo Welt!"); - kafkaTemplate.send(TOPIC_IN, recording.getUser(), mapper.writeValueAsString(recording)); + consumer.accept(recording.getUser(), recording); + + recording = new Recording(); recording.setUser("klaus"); recording.setSentence("Müsch gäb's auch!"); - kafkaTemplate.send(TOPIC_IN, recording.getUser(), mapper.writeValueAsString(recording)); + consumer.accept(recording.getUser(), recording); + + recording = new Recording(); recording.setUser("peter"); recording.setSentence("Boäh, echt! ß mal nä Nümmäh!"); - kafkaTemplate.send(TOPIC_IN, recording.getUser(), mapper.writeValueAsString(recording)); - - String peter1 = mapper.writeValueAsString(Word.of("peter", "Hallo")); - String peter2 = mapper.writeValueAsString(Word.of("peter", "Welt")); - String peter3 = mapper.writeValueAsString(Word.of("peter", "Boäh")); - String peter4 = mapper.writeValueAsString(Word.of("peter", "echt")); - String peter5 = mapper.writeValueAsString(Word.of("peter", "ß")); - String peter6 = mapper.writeValueAsString(Word.of("peter", "mal")); - String peter7 = mapper.writeValueAsString(Word.of("peter", "nä")); - String peter8 = mapper.writeValueAsString(Word.of("peter", "Nümmäh")); - - String klaus1 = mapper.writeValueAsString(Word.of("klaus","Müsch")); - String klaus2 = mapper.writeValueAsString(Word.of("klaus","gäb")); - String klaus3 = mapper.writeValueAsString(Word.of("klaus","s")); - String klaus4 = mapper.writeValueAsString(Word.of("klaus","auch")); + consumer.accept(recording.getUser(), recording); + } + + @Test + void testSendMessage() throws Exception + { + writeInputData((user, recording) -> kafkaTemplate.send(TOPIC_IN, user, recording)); await("Expexted converted data") .atMost(Duration.ofSeconds(5)) .untilAsserted(() -> - { - assertThat(consumer.received).hasSize(2); - assertThat(consumer.received.get("klaus")).containsExactly(klaus1, klaus2, klaus3, klaus4); - assertThat(consumer.received.get("peter")).containsExactly(peter1, peter2, peter3, peter4, peter5, peter6, peter7, peter8); - }); + assertExpectedMessages(consumer.getReceivedMessages())); + } + + + static void assertExpectedMessages(MultiValueMap receivedMessages) + { + MultiValueMap expected = new LinkedMultiValueMap<>(); + expectedMessages.forEach(keyValue -> expected.add(keyValue.key, keyValue.value)); + await("Received expected messages") + .atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> expected.forEach((user, word) -> + assertThat(receivedMessages.get(user)).containsExactlyElementsOf(word))); } + static Stream> expectedMessages = Stream.of( + KeyValue.pair( + "peter", + new TestWord("peter", "Hallo")), + KeyValue.pair( + "peter", + new TestWord("peter", "Welt")), + KeyValue.pair( + "klaus", + new TestWord("klaus", "Müsch")), + KeyValue.pair( + "klaus", + new TestWord("klaus", "gäb")), + KeyValue.pair( + "klaus", + new TestWord("klaus", "s")), + KeyValue.pair( + "klaus", + new TestWord("klaus", "auch")), + KeyValue.pair( + "peter", + new TestWord("peter", "Boäh")), + KeyValue.pair( + "peter", + new TestWord("peter", "echt")), + KeyValue.pair( + "peter", + new TestWord("peter", "ß")), + KeyValue.pair( + "peter", + new TestWord("peter", "mal")), + KeyValue.pair( + "peter", + new TestWord("peter", "nä")), + KeyValue.pair( + "peter", + new TestWord("peter", "Nümmäh"))); + static class Consumer { - final MultiValueMap received = new LinkedMultiValueMap<>(); + private final MultiValueMap received = new LinkedMultiValueMap<>(); @KafkaListener(groupId = "TEST", topics = TOPIC_OUT) - public void receive(ConsumerRecord record) + public synchronized void receive( + @Header(KafkaHeaders.RECEIVED_KEY) String key, + @Payload TestWord value) { - log.debug("Received message: {}", record); - received.add(record.key(), record.value()); + log.debug("Received message: {}={}", key, value); + received.add(key, value); + } + + synchronized MultiValueMap getReceivedMessages() + { + return received; } } + @TestConfiguration static class Configuration { -- 2.20.1 From d7fa29b35fef537b65e9110f708e781925ebff9f Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 26 May 2024 23:10:08 +0200 Subject: [PATCH 04/16] splitter: 1.2.0 - Separated test-data in class `TestData` -- COPY --- .../kafka/wordcount/splitter/TestData.java | 172 ++++++++++++++++++ 1 file changed, 172 insertions(+) create mode 100644 src/test/java/de/juplo/kafka/wordcount/splitter/TestData.java diff --git a/src/test/java/de/juplo/kafka/wordcount/splitter/TestData.java b/src/test/java/de/juplo/kafka/wordcount/splitter/TestData.java new file mode 100644 index 0000000..d1bbc0f --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/splitter/TestData.java @@ -0,0 +1,172 @@ +package de.juplo.kafka.wordcount.splitter; + +import de.juplo.kafka.wordcount.counter.TestWord; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.streams.KeyValue; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.messaging.handler.annotation.Payload; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; + +import java.time.Duration; +import java.util.function.BiConsumer; +import java.util.stream.Stream; + +import static de.juplo.kafka.wordcount.splitter.SplitterApplicationIT.*; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + + +@SpringBootTest( + properties = { + "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer", + "spring.kafka.producer.properties.spring.json.add.type.headers=false", + "spring.kafka.consumer.auto-offset-reset=earliest", + "spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer", + "spring.kafka.consumer.properties.spring.json.value.default.type=de.juplo.kafka.wordcount.counter.TestWord", + "spring.kafka.consumer.properties.spring.json.trusted.packages=de.juplo.kafka.wordcount.splitter", + "logging.level.root=WARN", + "logging.level.de.juplo=DEBUG", + "juplo.wordcount.splitter.bootstrap-server=${spring.embedded.kafka.brokers}", + "juplo.wordcount.splitter.input-topic=" + TOPIC_IN, + "juplo.wordcount.splitter.output-topic=" + TOPIC_OUT }) +@EmbeddedKafka(topics = { TOPIC_IN, TOPIC_OUT }) +@Slf4j +public class SplitterApplicationIT +{ + public final static String TOPIC_IN = "in"; + public final static String TOPIC_OUT = "out"; + + @Autowired + KafkaTemplate kafkaTemplate; + @Autowired + Consumer consumer; + + + @BeforeEach + public void clear() + { + consumer.received.clear(); + } + + static void writeInputData(BiConsumer consumer) + { + Recording recording; + + recording = new Recording(); + recording.setUser("peter"); + recording.setSentence("Hallo Welt!"); + consumer.accept(recording.getUser(), recording); + + recording = new Recording(); + recording.setUser("klaus"); + recording.setSentence("Müsch gäb's auch!"); + consumer.accept(recording.getUser(), recording); + + recording = new Recording(); + recording.setUser("peter"); + recording.setSentence("Boäh, echt! ß mal nä Nümmäh!"); + consumer.accept(recording.getUser(), recording); + } + + @Test + void testSendMessage() throws Exception + { + writeInputData((user, recording) -> kafkaTemplate.send(TOPIC_IN, user, recording)); + + await("Expexted converted data") + .atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> + assertExpectedMessages(consumer.getReceivedMessages())); + } + + + static void assertExpectedMessages(MultiValueMap receivedMessages) + { + MultiValueMap expected = new LinkedMultiValueMap<>(); + expectedMessages.forEach(keyValue -> expected.add(keyValue.key, keyValue.value)); + await("Received expected messages") + .atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> expected.forEach((user, word) -> + assertThat(receivedMessages.get(user)).containsExactlyElementsOf(word))); + } + + static Stream> expectedMessages = Stream.of( + KeyValue.pair( + "peter", + new TestWord("peter", "Hallo")), + KeyValue.pair( + "peter", + new TestWord("peter", "Welt")), + KeyValue.pair( + "klaus", + new TestWord("klaus", "Müsch")), + KeyValue.pair( + "klaus", + new TestWord("klaus", "gäb")), + KeyValue.pair( + "klaus", + new TestWord("klaus", "s")), + KeyValue.pair( + "klaus", + new TestWord("klaus", "auch")), + KeyValue.pair( + "peter", + new TestWord("peter", "Boäh")), + KeyValue.pair( + "peter", + new TestWord("peter", "echt")), + KeyValue.pair( + "peter", + new TestWord("peter", "ß")), + KeyValue.pair( + "peter", + new TestWord("peter", "mal")), + KeyValue.pair( + "peter", + new TestWord("peter", "nä")), + KeyValue.pair( + "peter", + new TestWord("peter", "Nümmäh"))); + + + static class Consumer + { + private final MultiValueMap received = new LinkedMultiValueMap<>(); + + @KafkaListener(groupId = "TEST", topics = TOPIC_OUT) + public synchronized void receive( + @Header(KafkaHeaders.RECEIVED_KEY) String key, + @Payload TestWord value) + { + log.debug("Received message: {}={}", key, value); + received.add(key, value); + } + + synchronized MultiValueMap getReceivedMessages() + { + return received; + } + } + + + @TestConfiguration + static class Configuration + { + @Bean + Consumer consumer() + { + return new Consumer(); + } + } +} -- 2.20.1 From 52831df8733700f29fd2e430d7a8912c26f22c08 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 26 May 2024 23:19:41 +0200 Subject: [PATCH 05/16] splitter: 1.2.0 - Separated test-data in class `TestData` -- ALIGN --- .../splitter/SplitterApplicationIT.java | 77 +--------------- .../kafka/wordcount/splitter/TestData.java | 89 +------------------ 2 files changed, 3 insertions(+), 163 deletions(-) diff --git a/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationIT.java index d1bbc0f..adf4dde 100644 --- a/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationIT.java @@ -2,7 +2,6 @@ package de.juplo.kafka.wordcount.splitter; import de.juplo.kafka.wordcount.counter.TestWord; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.streams.KeyValue; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; @@ -19,11 +18,8 @@ import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; import java.time.Duration; -import java.util.function.BiConsumer; -import java.util.stream.Stream; import static de.juplo.kafka.wordcount.splitter.SplitterApplicationIT.*; -import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; @@ -59,87 +55,18 @@ public class SplitterApplicationIT consumer.received.clear(); } - static void writeInputData(BiConsumer consumer) - { - Recording recording; - - recording = new Recording(); - recording.setUser("peter"); - recording.setSentence("Hallo Welt!"); - consumer.accept(recording.getUser(), recording); - - recording = new Recording(); - recording.setUser("klaus"); - recording.setSentence("Müsch gäb's auch!"); - consumer.accept(recording.getUser(), recording); - - recording = new Recording(); - recording.setUser("peter"); - recording.setSentence("Boäh, echt! ß mal nä Nümmäh!"); - consumer.accept(recording.getUser(), recording); - } - @Test void testSendMessage() throws Exception { - writeInputData((user, recording) -> kafkaTemplate.send(TOPIC_IN, user, recording)); + TestData.writeInputData((user, recording) -> kafkaTemplate.send(TOPIC_IN, user, recording)); await("Expexted converted data") .atMost(Duration.ofSeconds(5)) .untilAsserted(() -> - assertExpectedMessages(consumer.getReceivedMessages())); + TestData.assertExpectedMessages(consumer.getReceivedMessages())); } - static void assertExpectedMessages(MultiValueMap receivedMessages) - { - MultiValueMap expected = new LinkedMultiValueMap<>(); - expectedMessages.forEach(keyValue -> expected.add(keyValue.key, keyValue.value)); - await("Received expected messages") - .atMost(Duration.ofSeconds(5)) - .untilAsserted(() -> expected.forEach((user, word) -> - assertThat(receivedMessages.get(user)).containsExactlyElementsOf(word))); - } - - static Stream> expectedMessages = Stream.of( - KeyValue.pair( - "peter", - new TestWord("peter", "Hallo")), - KeyValue.pair( - "peter", - new TestWord("peter", "Welt")), - KeyValue.pair( - "klaus", - new TestWord("klaus", "Müsch")), - KeyValue.pair( - "klaus", - new TestWord("klaus", "gäb")), - KeyValue.pair( - "klaus", - new TestWord("klaus", "s")), - KeyValue.pair( - "klaus", - new TestWord("klaus", "auch")), - KeyValue.pair( - "peter", - new TestWord("peter", "Boäh")), - KeyValue.pair( - "peter", - new TestWord("peter", "echt")), - KeyValue.pair( - "peter", - new TestWord("peter", "ß")), - KeyValue.pair( - "peter", - new TestWord("peter", "mal")), - KeyValue.pair( - "peter", - new TestWord("peter", "nä")), - KeyValue.pair( - "peter", - new TestWord("peter", "Nümmäh"))); - - static class Consumer { private final MultiValueMap received = new LinkedMultiValueMap<>(); diff --git a/src/test/java/de/juplo/kafka/wordcount/splitter/TestData.java b/src/test/java/de/juplo/kafka/wordcount/splitter/TestData.java index d1bbc0f..00ea4c4 100644 --- a/src/test/java/de/juplo/kafka/wordcount/splitter/TestData.java +++ b/src/test/java/de/juplo/kafka/wordcount/splitter/TestData.java @@ -1,20 +1,7 @@ package de.juplo.kafka.wordcount.splitter; import de.juplo.kafka.wordcount.counter.TestWord; -import lombok.extern.slf4j.Slf4j; import org.apache.kafka.streams.KeyValue; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.boot.test.context.TestConfiguration; -import org.springframework.context.annotation.Bean; -import org.springframework.kafka.annotation.KafkaListener; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.kafka.support.KafkaHeaders; -import org.springframework.kafka.test.context.EmbeddedKafka; -import org.springframework.messaging.handler.annotation.Header; -import org.springframework.messaging.handler.annotation.Payload; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; @@ -22,43 +9,12 @@ import java.time.Duration; import java.util.function.BiConsumer; import java.util.stream.Stream; -import static de.juplo.kafka.wordcount.splitter.SplitterApplicationIT.*; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; -@SpringBootTest( - properties = { - "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer", - "spring.kafka.producer.properties.spring.json.add.type.headers=false", - "spring.kafka.consumer.auto-offset-reset=earliest", - "spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer", - "spring.kafka.consumer.properties.spring.json.value.default.type=de.juplo.kafka.wordcount.counter.TestWord", - "spring.kafka.consumer.properties.spring.json.trusted.packages=de.juplo.kafka.wordcount.splitter", - "logging.level.root=WARN", - "logging.level.de.juplo=DEBUG", - "juplo.wordcount.splitter.bootstrap-server=${spring.embedded.kafka.brokers}", - "juplo.wordcount.splitter.input-topic=" + TOPIC_IN, - "juplo.wordcount.splitter.output-topic=" + TOPIC_OUT }) -@EmbeddedKafka(topics = { TOPIC_IN, TOPIC_OUT }) -@Slf4j -public class SplitterApplicationIT +public class TestData { - public final static String TOPIC_IN = "in"; - public final static String TOPIC_OUT = "out"; - - @Autowired - KafkaTemplate kafkaTemplate; - @Autowired - Consumer consumer; - - - @BeforeEach - public void clear() - { - consumer.received.clear(); - } - static void writeInputData(BiConsumer consumer) { Recording recording; @@ -79,18 +35,6 @@ public class SplitterApplicationIT consumer.accept(recording.getUser(), recording); } - @Test - void testSendMessage() throws Exception - { - writeInputData((user, recording) -> kafkaTemplate.send(TOPIC_IN, user, recording)); - - await("Expexted converted data") - .atMost(Duration.ofSeconds(5)) - .untilAsserted(() -> - assertExpectedMessages(consumer.getReceivedMessages())); - } - - static void assertExpectedMessages(MultiValueMap receivedMessages) { MultiValueMap expected = new LinkedMultiValueMap<>(); @@ -138,35 +82,4 @@ public class SplitterApplicationIT KeyValue.pair( "peter", new TestWord("peter", "Nümmäh"))); - - - static class Consumer - { - private final MultiValueMap received = new LinkedMultiValueMap<>(); - - @KafkaListener(groupId = "TEST", topics = TOPIC_OUT) - public synchronized void receive( - @Header(KafkaHeaders.RECEIVED_KEY) String key, - @Payload TestWord value) - { - log.debug("Received message: {}={}", key, value); - received.add(key, value); - } - - synchronized MultiValueMap getReceivedMessages() - { - return received; - } - } - - - @TestConfiguration - static class Configuration - { - @Bean - Consumer consumer() - { - return new Consumer(); - } - } } -- 2.20.1 From b1db057d3c58d20620482ff68eb210b309e615fe Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 2 Jun 2024 17:28:55 +0200 Subject: [PATCH 06/16] splitter: 1.2.0 - Introduced `TestWord.of(..)` --- .../kafka/wordcount/counter/TestWord.java | 2 +- .../kafka/wordcount/splitter/TestData.java | 24 +++++++++---------- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/TestWord.java b/src/test/java/de/juplo/kafka/wordcount/counter/TestWord.java index f2c2447..0453671 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/TestWord.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/TestWord.java @@ -7,7 +7,7 @@ import lombok.NoArgsConstructor; @Data @NoArgsConstructor -@AllArgsConstructor +@AllArgsConstructor(staticName = "of") public class TestWord { String user; diff --git a/src/test/java/de/juplo/kafka/wordcount/splitter/TestData.java b/src/test/java/de/juplo/kafka/wordcount/splitter/TestData.java index 00ea4c4..89cebaa 100644 --- a/src/test/java/de/juplo/kafka/wordcount/splitter/TestData.java +++ b/src/test/java/de/juplo/kafka/wordcount/splitter/TestData.java @@ -48,38 +48,38 @@ public class TestData static Stream> expectedMessages = Stream.of( KeyValue.pair( "peter", - new TestWord("peter", "Hallo")), + TestWord.of("peter", "Hallo")), KeyValue.pair( "peter", - new TestWord("peter", "Welt")), + TestWord.of("peter", "Welt")), KeyValue.pair( "klaus", - new TestWord("klaus", "Müsch")), + TestWord.of("klaus", "Müsch")), KeyValue.pair( "klaus", - new TestWord("klaus", "gäb")), + TestWord.of("klaus", "gäb")), KeyValue.pair( "klaus", - new TestWord("klaus", "s")), + TestWord.of("klaus", "s")), KeyValue.pair( "klaus", - new TestWord("klaus", "auch")), + TestWord.of("klaus", "auch")), KeyValue.pair( "peter", - new TestWord("peter", "Boäh")), + TestWord.of("peter", "Boäh")), KeyValue.pair( "peter", - new TestWord("peter", "echt")), + TestWord.of("peter", "echt")), KeyValue.pair( "peter", - new TestWord("peter", "ß")), + TestWord.of("peter", "ß")), KeyValue.pair( "peter", - new TestWord("peter", "mal")), + TestWord.of("peter", "mal")), KeyValue.pair( "peter", - new TestWord("peter", "nä")), + TestWord.of("peter", "nä")), KeyValue.pair( "peter", - new TestWord("peter", "Nümmäh"))); + TestWord.of("peter", "Nümmäh"))); } -- 2.20.1 From cfed6b631771687b645375394c422ec0c02720f5 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 2 Jun 2024 17:28:55 +0200 Subject: [PATCH 07/16] splitter: 1.2.0 - Introduced `TestRanking` * `SplitterApplicationIT` uses the separate class `TestRanking` to represent the input data, that comes from the foreign domain. --- .../wordcount/recorder/TestRecording.java | 15 ++++++++ .../splitter/SplitterApplicationIT.java | 3 +- .../kafka/wordcount/splitter/TestData.java | 38 ++++++++++--------- 3 files changed, 38 insertions(+), 18 deletions(-) create mode 100644 src/test/java/de/juplo/kafka/wordcount/recorder/TestRecording.java diff --git a/src/test/java/de/juplo/kafka/wordcount/recorder/TestRecording.java b/src/test/java/de/juplo/kafka/wordcount/recorder/TestRecording.java new file mode 100644 index 0000000..2d09896 --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/recorder/TestRecording.java @@ -0,0 +1,15 @@ +package de.juplo.kafka.wordcount.recorder; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + + +@Data +@NoArgsConstructor +@AllArgsConstructor(staticName = "of") +public class TestRecording +{ + String user; + String sentence; +} diff --git a/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationIT.java index adf4dde..a702e1d 100644 --- a/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationIT.java @@ -1,6 +1,7 @@ package de.juplo.kafka.wordcount.splitter; import de.juplo.kafka.wordcount.counter.TestWord; +import de.juplo.kafka.wordcount.recorder.TestRecording; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -44,7 +45,7 @@ public class SplitterApplicationIT public final static String TOPIC_OUT = "out"; @Autowired - KafkaTemplate kafkaTemplate; + KafkaTemplate kafkaTemplate; @Autowired Consumer consumer; diff --git a/src/test/java/de/juplo/kafka/wordcount/splitter/TestData.java b/src/test/java/de/juplo/kafka/wordcount/splitter/TestData.java index 89cebaa..2af8644 100644 --- a/src/test/java/de/juplo/kafka/wordcount/splitter/TestData.java +++ b/src/test/java/de/juplo/kafka/wordcount/splitter/TestData.java @@ -1,6 +1,7 @@ package de.juplo.kafka.wordcount.splitter; import de.juplo.kafka.wordcount.counter.TestWord; +import de.juplo.kafka.wordcount.recorder.TestRecording; import org.apache.kafka.streams.KeyValue; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; @@ -15,26 +16,29 @@ import static org.awaitility.Awaitility.await; public class TestData { - static void writeInputData(BiConsumer consumer) - { - Recording recording; - - recording = new Recording(); - recording.setUser("peter"); - recording.setSentence("Hallo Welt!"); - consumer.accept(recording.getUser(), recording); - - recording = new Recording(); - recording.setUser("klaus"); - recording.setSentence("Müsch gäb's auch!"); - consumer.accept(recording.getUser(), recording); + static final String PETER = "peter"; + static final String KLAUS = "klaus"; - recording = new Recording(); - recording.setUser("peter"); - recording.setSentence("Boäh, echt! ß mal nä Nümmäh!"); - consumer.accept(recording.getUser(), recording); + static void writeInputData(BiConsumer consumer) + { + Stream + .of(INPUT_MESSAGES) + .forEach(kv -> consumer.accept(kv.key, kv.value)); } + static final KeyValue[] INPUT_MESSAGES = new KeyValue[] + { + new KeyValue<>( + PETER, + TestRecording.of(PETER, "Hallo Welt!")), + new KeyValue<>( + KLAUS, + TestRecording.of(KLAUS, "Müsch gäb's auch!")), + new KeyValue<>( + PETER, + TestRecording.of(PETER, "Boäh, echt! ß mal nä Nümmäh!")), + }; + static void assertExpectedMessages(MultiValueMap receivedMessages) { MultiValueMap expected = new LinkedMultiValueMap<>(); -- 2.20.1 From c9663d885810c20447e31d48f1d5b3de3b90e229 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 2 Jun 2024 13:44:28 +0200 Subject: [PATCH 08/16] splitter: 1.2.0 - `TestData.assertExpectedResult(..)` --- .../kafka/wordcount/splitter/TestData.java | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/src/test/java/de/juplo/kafka/wordcount/splitter/TestData.java b/src/test/java/de/juplo/kafka/wordcount/splitter/TestData.java index 2af8644..e2bf9cc 100644 --- a/src/test/java/de/juplo/kafka/wordcount/splitter/TestData.java +++ b/src/test/java/de/juplo/kafka/wordcount/splitter/TestData.java @@ -41,15 +41,14 @@ public class TestData static void assertExpectedMessages(MultiValueMap receivedMessages) { - MultiValueMap expected = new LinkedMultiValueMap<>(); - expectedMessages.forEach(keyValue -> expected.add(keyValue.key, keyValue.value)); await("Received expected messages") .atMost(Duration.ofSeconds(5)) - .untilAsserted(() -> expected.forEach((user, word) -> + .untilAsserted(() -> expectedMessages().forEach((user, word) -> assertThat(receivedMessages.get(user)).containsExactlyElementsOf(word))); } - static Stream> expectedMessages = Stream.of( + static final KeyValue[] EXPECTED_MESSAGES = new KeyValue[] + { KeyValue.pair( "peter", TestWord.of("peter", "Hallo")), @@ -85,5 +84,15 @@ public class TestData TestWord.of("peter", "nä")), KeyValue.pair( "peter", - TestWord.of("peter", "Nümmäh"))); + TestWord.of("peter", "Nümmäh")), + }; + + static MultiValueMap expectedMessages() + { + MultiValueMap expectedMessages = new LinkedMultiValueMap<>(); + Stream + .of(EXPECTED_MESSAGES) + .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value)); + return expectedMessages; + } } -- 2.20.1 From 5283e1688c50764d828f83f7d58d4cf1c7e56cf9 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 2 Jun 2024 13:44:28 +0200 Subject: [PATCH 09/16] splitter: 1.2.0 - Refined creation of input in `SplitterApplicationIT` --- .../splitter/SplitterApplicationIT.java | 38 ++++++++++++++----- .../kafka/wordcount/splitter/TestData.java | 7 ---- 2 files changed, 28 insertions(+), 17 deletions(-) diff --git a/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationIT.java index a702e1d..743c06e 100644 --- a/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationIT.java @@ -3,7 +3,7 @@ package de.juplo.kafka.wordcount.splitter; import de.juplo.kafka.wordcount.counter.TestWord; import de.juplo.kafka.wordcount.recorder.TestRecording; import lombok.extern.slf4j.Slf4j; -import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @@ -12,6 +12,7 @@ import org.springframework.context.annotation.Bean; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.kafka.support.SendResult; import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.messaging.handler.annotation.Header; import org.springframework.messaging.handler.annotation.Payload; @@ -19,8 +20,10 @@ import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; import java.time.Duration; +import java.util.stream.Stream; -import static de.juplo.kafka.wordcount.splitter.SplitterApplicationIT.*; +import static de.juplo.kafka.wordcount.splitter.SplitterApplicationIT.TOPIC_IN; +import static de.juplo.kafka.wordcount.splitter.SplitterApplicationIT.TOPIC_OUT; import static org.awaitility.Awaitility.await; @@ -44,23 +47,38 @@ public class SplitterApplicationIT public final static String TOPIC_IN = "in"; public final static String TOPIC_OUT = "out"; - @Autowired - KafkaTemplate kafkaTemplate; @Autowired Consumer consumer; - - @BeforeEach - public void clear() + @BeforeAll + public static void testSendMessage( + @Autowired KafkaTemplate kafkaTemplate) { - consumer.received.clear(); + Stream + .of(TestData.INPUT_MESSAGES) + .forEach(kv -> + { + try + { + SendResult result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get(); + log.info( + "Sent: {}={}, partition={}, offset={}", + result.getProducerRecord().key(), + result.getProducerRecord().value(), + result.getRecordMetadata().partition(), + result.getRecordMetadata().offset()); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + }); } + @Test void testSendMessage() throws Exception { - TestData.writeInputData((user, recording) -> kafkaTemplate.send(TOPIC_IN, user, recording)); - await("Expexted converted data") .atMost(Duration.ofSeconds(5)) .untilAsserted(() -> diff --git a/src/test/java/de/juplo/kafka/wordcount/splitter/TestData.java b/src/test/java/de/juplo/kafka/wordcount/splitter/TestData.java index e2bf9cc..8c343d5 100644 --- a/src/test/java/de/juplo/kafka/wordcount/splitter/TestData.java +++ b/src/test/java/de/juplo/kafka/wordcount/splitter/TestData.java @@ -7,7 +7,6 @@ import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; import java.time.Duration; -import java.util.function.BiConsumer; import java.util.stream.Stream; import static org.assertj.core.api.Assertions.assertThat; @@ -19,12 +18,6 @@ public class TestData static final String PETER = "peter"; static final String KLAUS = "klaus"; - static void writeInputData(BiConsumer consumer) - { - Stream - .of(INPUT_MESSAGES) - .forEach(kv -> consumer.accept(kv.key, kv.value)); - } static final KeyValue[] INPUT_MESSAGES = new KeyValue[] { -- 2.20.1 From 53b03696e9e3890d9aeca795482cf66252449fad Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 2 Jun 2024 18:03:15 +0200 Subject: [PATCH 10/16] splitter: 1.2.0 - Refined `TestData` clearified concerns --- .../kafka/wordcount/splitter/SplitterApplicationIT.java | 5 ++--- .../java/de/juplo/kafka/wordcount/splitter/TestData.java | 9 +++++++-- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationIT.java index 743c06e..b91af13 100644 --- a/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationIT.java @@ -20,7 +20,6 @@ import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; import java.time.Duration; -import java.util.stream.Stream; import static de.juplo.kafka.wordcount.splitter.SplitterApplicationIT.TOPIC_IN; import static de.juplo.kafka.wordcount.splitter.SplitterApplicationIT.TOPIC_OUT; @@ -54,8 +53,8 @@ public class SplitterApplicationIT public static void testSendMessage( @Autowired KafkaTemplate kafkaTemplate) { - Stream - .of(TestData.INPUT_MESSAGES) + TestData + .getInputMessages() .forEach(kv -> { try diff --git a/src/test/java/de/juplo/kafka/wordcount/splitter/TestData.java b/src/test/java/de/juplo/kafka/wordcount/splitter/TestData.java index 8c343d5..a3f7575 100644 --- a/src/test/java/de/juplo/kafka/wordcount/splitter/TestData.java +++ b/src/test/java/de/juplo/kafka/wordcount/splitter/TestData.java @@ -19,7 +19,12 @@ public class TestData static final String KLAUS = "klaus"; - static final KeyValue[] INPUT_MESSAGES = new KeyValue[] + static final Stream> getInputMessages() + { + return Stream.of(INPUT_MESSAGES); + } + + private static final KeyValue[] INPUT_MESSAGES = new KeyValue[] { new KeyValue<>( PETER, @@ -40,7 +45,7 @@ public class TestData assertThat(receivedMessages.get(user)).containsExactlyElementsOf(word))); } - static final KeyValue[] EXPECTED_MESSAGES = new KeyValue[] + private static final KeyValue[] EXPECTED_MESSAGES = new KeyValue[] { KeyValue.pair( "peter", -- 2.20.1 From fa2cbce197b202b7ea32b6540c3b78fd80f4dd8e Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 2 Jun 2024 18:36:13 +0200 Subject: [PATCH 11/16] splitter: 1.2.0 - Renamed classes for test-input/-output -- COPY --- .../wordcount/counter/{TestWord.java => TestOutputWord.java} | 0 .../recorder/{TestRecording.java => TestInputRecording.java} | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename src/test/java/de/juplo/kafka/wordcount/counter/{TestWord.java => TestOutputWord.java} (100%) rename src/test/java/de/juplo/kafka/wordcount/recorder/{TestRecording.java => TestInputRecording.java} (100%) diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/TestWord.java b/src/test/java/de/juplo/kafka/wordcount/counter/TestOutputWord.java similarity index 100% rename from src/test/java/de/juplo/kafka/wordcount/counter/TestWord.java rename to src/test/java/de/juplo/kafka/wordcount/counter/TestOutputWord.java diff --git a/src/test/java/de/juplo/kafka/wordcount/recorder/TestRecording.java b/src/test/java/de/juplo/kafka/wordcount/recorder/TestInputRecording.java similarity index 100% rename from src/test/java/de/juplo/kafka/wordcount/recorder/TestRecording.java rename to src/test/java/de/juplo/kafka/wordcount/recorder/TestInputRecording.java -- 2.20.1 From 47ed803476f547b79439b510d409c81d7d85db53 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 2 Jun 2024 18:36:26 +0200 Subject: [PATCH 12/16] splitter: 1.2.0 - Renamed classes for test-input/-output -- ALIGN --- .../wordcount/counter/TestOutputWord.java | 2 +- .../recorder/TestInputRecording.java | 2 +- .../splitter/SplitterApplicationIT.java | 14 +++--- .../kafka/wordcount/splitter/TestData.java | 46 +++++++++---------- 4 files changed, 32 insertions(+), 32 deletions(-) diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/TestOutputWord.java b/src/test/java/de/juplo/kafka/wordcount/counter/TestOutputWord.java index 0453671..7dcc66a 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/TestOutputWord.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/TestOutputWord.java @@ -8,7 +8,7 @@ import lombok.NoArgsConstructor; @Data @NoArgsConstructor @AllArgsConstructor(staticName = "of") -public class TestWord +public class TestOutputWord { String user; String word; diff --git a/src/test/java/de/juplo/kafka/wordcount/recorder/TestInputRecording.java b/src/test/java/de/juplo/kafka/wordcount/recorder/TestInputRecording.java index 2d09896..a62b1b4 100644 --- a/src/test/java/de/juplo/kafka/wordcount/recorder/TestInputRecording.java +++ b/src/test/java/de/juplo/kafka/wordcount/recorder/TestInputRecording.java @@ -8,7 +8,7 @@ import lombok.NoArgsConstructor; @Data @NoArgsConstructor @AllArgsConstructor(staticName = "of") -public class TestRecording +public class TestInputRecording { String user; String sentence; diff --git a/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationIT.java index b91af13..891a435 100644 --- a/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationIT.java @@ -1,7 +1,7 @@ package de.juplo.kafka.wordcount.splitter; -import de.juplo.kafka.wordcount.counter.TestWord; -import de.juplo.kafka.wordcount.recorder.TestRecording; +import de.juplo.kafka.wordcount.counter.TestOutputWord; +import de.juplo.kafka.wordcount.recorder.TestInputRecording; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -51,7 +51,7 @@ public class SplitterApplicationIT @BeforeAll public static void testSendMessage( - @Autowired KafkaTemplate kafkaTemplate) + @Autowired KafkaTemplate kafkaTemplate) { TestData .getInputMessages() @@ -59,7 +59,7 @@ public class SplitterApplicationIT { try { - SendResult result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get(); + SendResult result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get(); log.info( "Sent: {}={}, partition={}, offset={}", result.getProducerRecord().key(), @@ -87,18 +87,18 @@ public class SplitterApplicationIT static class Consumer { - private final MultiValueMap received = new LinkedMultiValueMap<>(); + private final MultiValueMap received = new LinkedMultiValueMap<>(); @KafkaListener(groupId = "TEST", topics = TOPIC_OUT) public synchronized void receive( @Header(KafkaHeaders.RECEIVED_KEY) String key, - @Payload TestWord value) + @Payload TestOutputWord value) { log.debug("Received message: {}={}", key, value); received.add(key, value); } - synchronized MultiValueMap getReceivedMessages() + synchronized MultiValueMap getReceivedMessages() { return received; } diff --git a/src/test/java/de/juplo/kafka/wordcount/splitter/TestData.java b/src/test/java/de/juplo/kafka/wordcount/splitter/TestData.java index a3f7575..feedb1e 100644 --- a/src/test/java/de/juplo/kafka/wordcount/splitter/TestData.java +++ b/src/test/java/de/juplo/kafka/wordcount/splitter/TestData.java @@ -1,7 +1,7 @@ package de.juplo.kafka.wordcount.splitter; -import de.juplo.kafka.wordcount.counter.TestWord; -import de.juplo.kafka.wordcount.recorder.TestRecording; +import de.juplo.kafka.wordcount.counter.TestOutputWord; +import de.juplo.kafka.wordcount.recorder.TestInputRecording; import org.apache.kafka.streams.KeyValue; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; @@ -19,25 +19,25 @@ public class TestData static final String KLAUS = "klaus"; - static final Stream> getInputMessages() + static final Stream> getInputMessages() { return Stream.of(INPUT_MESSAGES); } - private static final KeyValue[] INPUT_MESSAGES = new KeyValue[] + private static final KeyValue[] INPUT_MESSAGES = new KeyValue[] { new KeyValue<>( PETER, - TestRecording.of(PETER, "Hallo Welt!")), + TestInputRecording.of(PETER, "Hallo Welt!")), new KeyValue<>( KLAUS, - TestRecording.of(KLAUS, "Müsch gäb's auch!")), + TestInputRecording.of(KLAUS, "Müsch gäb's auch!")), new KeyValue<>( PETER, - TestRecording.of(PETER, "Boäh, echt! ß mal nä Nümmäh!")), + TestInputRecording.of(PETER, "Boäh, echt! ß mal nä Nümmäh!")), }; - static void assertExpectedMessages(MultiValueMap receivedMessages) + static void assertExpectedMessages(MultiValueMap receivedMessages) { await("Received expected messages") .atMost(Duration.ofSeconds(5)) @@ -45,49 +45,49 @@ public class TestData assertThat(receivedMessages.get(user)).containsExactlyElementsOf(word))); } - private static final KeyValue[] EXPECTED_MESSAGES = new KeyValue[] + private static final KeyValue[] EXPECTED_MESSAGES = new KeyValue[] { KeyValue.pair( "peter", - TestWord.of("peter", "Hallo")), + TestOutputWord.of("peter", "Hallo")), KeyValue.pair( "peter", - TestWord.of("peter", "Welt")), + TestOutputWord.of("peter", "Welt")), KeyValue.pair( "klaus", - TestWord.of("klaus", "Müsch")), + TestOutputWord.of("klaus", "Müsch")), KeyValue.pair( "klaus", - TestWord.of("klaus", "gäb")), + TestOutputWord.of("klaus", "gäb")), KeyValue.pair( "klaus", - TestWord.of("klaus", "s")), + TestOutputWord.of("klaus", "s")), KeyValue.pair( "klaus", - TestWord.of("klaus", "auch")), + TestOutputWord.of("klaus", "auch")), KeyValue.pair( "peter", - TestWord.of("peter", "Boäh")), + TestOutputWord.of("peter", "Boäh")), KeyValue.pair( "peter", - TestWord.of("peter", "echt")), + TestOutputWord.of("peter", "echt")), KeyValue.pair( "peter", - TestWord.of("peter", "ß")), + TestOutputWord.of("peter", "ß")), KeyValue.pair( "peter", - TestWord.of("peter", "mal")), + TestOutputWord.of("peter", "mal")), KeyValue.pair( "peter", - TestWord.of("peter", "nä")), + TestOutputWord.of("peter", "nä")), KeyValue.pair( "peter", - TestWord.of("peter", "Nümmäh")), + TestOutputWord.of("peter", "Nümmäh")), }; - static MultiValueMap expectedMessages() + static MultiValueMap expectedMessages() { - MultiValueMap expectedMessages = new LinkedMultiValueMap<>(); + MultiValueMap expectedMessages = new LinkedMultiValueMap<>(); Stream .of(EXPECTED_MESSAGES) .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value)); -- 2.20.1 From b95a6dae1b668f87ec14d0ace9b768ca89e338b3 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 2 Jun 2024 18:41:02 +0200 Subject: [PATCH 13/16] splitter: 1.2.0 - A domain-class (``User``) is used as key --- .../SplitterApplicationConfiguration.java | 3 +- .../splitter/SplitterStreamProcessor.java | 2 +- .../juplo/kafka/wordcount/splitter/User.java | 12 ++++ .../wordcount/counter/TestOutputUser.java | 14 ++++ .../wordcount/recorder/TestInputUser.java | 14 ++++ .../splitter/SplitterApplicationIT.java | 20 +++--- .../kafka/wordcount/splitter/TestData.java | 72 ++++++++++--------- 7 files changed, 92 insertions(+), 45 deletions(-) create mode 100644 src/main/java/de/juplo/kafka/wordcount/splitter/User.java create mode 100644 src/test/java/de/juplo/kafka/wordcount/counter/TestOutputUser.java create mode 100644 src/test/java/de/juplo/kafka/wordcount/recorder/TestInputUser.java diff --git a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationConfiguration.java b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationConfiguration.java index ead41f8..7143e1a 100644 --- a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationConfiguration.java @@ -33,9 +33,10 @@ public class SplitterApplicationConfiguration propertyMap.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId()); propertyMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer()); - propertyMap.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName()); + propertyMap.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); propertyMap.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); propertyMap.put(JsonDeserializer.TRUSTED_PACKAGES, SplitterApplication.class.getName()); + propertyMap.put(JsonDeserializer.KEY_DEFAULT_TYPE, User.class.getName()); propertyMap.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Recording.class.getName()); propertyMap.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false); propertyMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); diff --git a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java index 60c569b..d0070c0 100644 --- a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java @@ -25,7 +25,7 @@ public class SplitterStreamProcessor { StreamsBuilder builder = new StreamsBuilder(); - KStream source = builder.stream(inputTopic); + KStream source = builder.stream(inputTopic); source .flatMapValues(recording -> Arrays diff --git a/src/main/java/de/juplo/kafka/wordcount/splitter/User.java b/src/main/java/de/juplo/kafka/wordcount/splitter/User.java new file mode 100644 index 0000000..8a65695 --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/splitter/User.java @@ -0,0 +1,12 @@ +package de.juplo.kafka.wordcount.splitter; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.Data; + + +@Data +@JsonIgnoreProperties(ignoreUnknown = true) +public class User +{ + String user; +} diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/TestOutputUser.java b/src/test/java/de/juplo/kafka/wordcount/counter/TestOutputUser.java new file mode 100644 index 0000000..4406b3b --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/counter/TestOutputUser.java @@ -0,0 +1,14 @@ +package de.juplo.kafka.wordcount.counter; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + + +@Data +@NoArgsConstructor +@AllArgsConstructor(staticName = "of") +public class TestOutputUser +{ + String user; +} diff --git a/src/test/java/de/juplo/kafka/wordcount/recorder/TestInputUser.java b/src/test/java/de/juplo/kafka/wordcount/recorder/TestInputUser.java new file mode 100644 index 0000000..ce413ba --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/recorder/TestInputUser.java @@ -0,0 +1,14 @@ +package de.juplo.kafka.wordcount.recorder; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + + +@Data +@NoArgsConstructor +@AllArgsConstructor(staticName = "of") +public class TestInputUser +{ + String user; +} diff --git a/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationIT.java index 891a435..e945b31 100644 --- a/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationIT.java @@ -1,7 +1,9 @@ package de.juplo.kafka.wordcount.splitter; +import de.juplo.kafka.wordcount.counter.TestOutputUser; import de.juplo.kafka.wordcount.counter.TestOutputWord; import de.juplo.kafka.wordcount.recorder.TestInputRecording; +import de.juplo.kafka.wordcount.recorder.TestInputUser; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -28,12 +30,14 @@ import static org.awaitility.Awaitility.await; @SpringBootTest( properties = { + "spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer", "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer", "spring.kafka.producer.properties.spring.json.add.type.headers=false", "spring.kafka.consumer.auto-offset-reset=earliest", + "spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer", "spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer", - "spring.kafka.consumer.properties.spring.json.value.default.type=de.juplo.kafka.wordcount.counter.TestWord", - "spring.kafka.consumer.properties.spring.json.trusted.packages=de.juplo.kafka.wordcount.splitter", + "spring.kafka.consumer.properties.spring.json.key.default.type=de.juplo.kafka.wordcount.counter.TestOutputUser", + "spring.kafka.consumer.properties.spring.json.value.default.type=de.juplo.kafka.wordcount.counter.TestOutputWord", "logging.level.root=WARN", "logging.level.de.juplo=DEBUG", "juplo.wordcount.splitter.bootstrap-server=${spring.embedded.kafka.brokers}", @@ -51,7 +55,7 @@ public class SplitterApplicationIT @BeforeAll public static void testSendMessage( - @Autowired KafkaTemplate kafkaTemplate) + @Autowired KafkaTemplate kafkaTemplate) { TestData .getInputMessages() @@ -59,7 +63,7 @@ public class SplitterApplicationIT { try { - SendResult result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get(); + SendResult result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get(); log.info( "Sent: {}={}, partition={}, offset={}", result.getProducerRecord().key(), @@ -78,7 +82,7 @@ public class SplitterApplicationIT @Test void testSendMessage() throws Exception { - await("Expexted converted data") + await("Expected converted data") .atMost(Duration.ofSeconds(5)) .untilAsserted(() -> TestData.assertExpectedMessages(consumer.getReceivedMessages())); @@ -87,18 +91,18 @@ public class SplitterApplicationIT static class Consumer { - private final MultiValueMap received = new LinkedMultiValueMap<>(); + private final MultiValueMap received = new LinkedMultiValueMap<>(); @KafkaListener(groupId = "TEST", topics = TOPIC_OUT) public synchronized void receive( - @Header(KafkaHeaders.RECEIVED_KEY) String key, + @Header(KafkaHeaders.RECEIVED_KEY) TestOutputUser key, @Payload TestOutputWord value) { log.debug("Received message: {}={}", key, value); received.add(key, value); } - synchronized MultiValueMap getReceivedMessages() + synchronized MultiValueMap getReceivedMessages() { return received; } diff --git a/src/test/java/de/juplo/kafka/wordcount/splitter/TestData.java b/src/test/java/de/juplo/kafka/wordcount/splitter/TestData.java index feedb1e..f89b099 100644 --- a/src/test/java/de/juplo/kafka/wordcount/splitter/TestData.java +++ b/src/test/java/de/juplo/kafka/wordcount/splitter/TestData.java @@ -1,7 +1,9 @@ package de.juplo.kafka.wordcount.splitter; +import de.juplo.kafka.wordcount.counter.TestOutputUser; import de.juplo.kafka.wordcount.counter.TestOutputWord; import de.juplo.kafka.wordcount.recorder.TestInputRecording; +import de.juplo.kafka.wordcount.recorder.TestInputUser; import org.apache.kafka.streams.KeyValue; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; @@ -15,29 +17,29 @@ import static org.awaitility.Awaitility.await; public class TestData { - static final String PETER = "peter"; - static final String KLAUS = "klaus"; + static final TestInputUser PETER = TestInputUser.of("peter"); + static final TestInputUser KLAUS = TestInputUser.of("klaus"); - static final Stream> getInputMessages() + static final Stream> getInputMessages() { return Stream.of(INPUT_MESSAGES); } - private static final KeyValue[] INPUT_MESSAGES = new KeyValue[] + private static final KeyValue[] INPUT_MESSAGES = new KeyValue[] { new KeyValue<>( PETER, - TestInputRecording.of(PETER, "Hallo Welt!")), + TestInputRecording.of(PETER.getUser(), "Hallo Welt!")), new KeyValue<>( KLAUS, - TestInputRecording.of(KLAUS, "Müsch gäb's auch!")), + TestInputRecording.of(KLAUS.getUser(), "Müsch gäb's auch!")), new KeyValue<>( PETER, - TestInputRecording.of(PETER, "Boäh, echt! ß mal nä Nümmäh!")), + TestInputRecording.of(PETER.getUser(), "Boäh, echt! ß mal nä Nümmäh!")), }; - static void assertExpectedMessages(MultiValueMap receivedMessages) + static void assertExpectedMessages(MultiValueMap receivedMessages) { await("Received expected messages") .atMost(Duration.ofSeconds(5)) @@ -45,49 +47,49 @@ public class TestData assertThat(receivedMessages.get(user)).containsExactlyElementsOf(word))); } - private static final KeyValue[] EXPECTED_MESSAGES = new KeyValue[] + private static final KeyValue[] EXPECTED_MESSAGES = new KeyValue[] { KeyValue.pair( - "peter", - TestOutputWord.of("peter", "Hallo")), + TestOutputUser.of(PETER.getUser()), + TestOutputWord.of(PETER.getUser(), "Hallo")), KeyValue.pair( - "peter", - TestOutputWord.of("peter", "Welt")), + TestOutputUser.of(PETER.getUser()), + TestOutputWord.of(PETER.getUser(), "Welt")), KeyValue.pair( - "klaus", - TestOutputWord.of("klaus", "Müsch")), + TestOutputUser.of(KLAUS.getUser()), + TestOutputWord.of(KLAUS.getUser(), "Müsch")), KeyValue.pair( - "klaus", - TestOutputWord.of("klaus", "gäb")), + TestOutputUser.of(KLAUS.getUser()), + TestOutputWord.of(KLAUS.getUser(), "gäb")), KeyValue.pair( - "klaus", - TestOutputWord.of("klaus", "s")), + TestOutputUser.of(KLAUS.getUser()), + TestOutputWord.of(KLAUS.getUser(), "s")), KeyValue.pair( - "klaus", - TestOutputWord.of("klaus", "auch")), + TestOutputUser.of(KLAUS.getUser()), + TestOutputWord.of(KLAUS.getUser(), "auch")), KeyValue.pair( - "peter", - TestOutputWord.of("peter", "Boäh")), + TestOutputUser.of(PETER.getUser()), + TestOutputWord.of(PETER.getUser(), "Boäh")), KeyValue.pair( - "peter", - TestOutputWord.of("peter", "echt")), + TestOutputUser.of(PETER.getUser()), + TestOutputWord.of(PETER.getUser(), "echt")), KeyValue.pair( - "peter", - TestOutputWord.of("peter", "ß")), + TestOutputUser.of(PETER.getUser()), + TestOutputWord.of(PETER.getUser(), "ß")), KeyValue.pair( - "peter", - TestOutputWord.of("peter", "mal")), + TestOutputUser.of(PETER.getUser()), + TestOutputWord.of(PETER.getUser(), "mal")), KeyValue.pair( - "peter", - TestOutputWord.of("peter", "nä")), + TestOutputUser.of(PETER.getUser()), + TestOutputWord.of(PETER.getUser(), "nä")), KeyValue.pair( - "peter", - TestOutputWord.of("peter", "Nümmäh")), + TestOutputUser.of(PETER.getUser()), + TestOutputWord.of(PETER.getUser(), "Nümmäh")), }; - static MultiValueMap expectedMessages() + static MultiValueMap expectedMessages() { - MultiValueMap expectedMessages = new LinkedMultiValueMap<>(); + MultiValueMap expectedMessages = new LinkedMultiValueMap<>(); Stream .of(EXPECTED_MESSAGES) .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value)); -- 2.20.1 From d25dc9ec4152f4d2de60abbd387c9c57e935135b Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Tue, 4 Jun 2024 23:26:38 +0200 Subject: [PATCH 14/16] splitter: 1.2.0 - Refined `SplitterApplicationConfiguration` * Separated the configuration for serialization/deserialization in a static method. --- .../SplitterApplicationConfiguration.java | 42 ++++++++++++------- 1 file changed, 28 insertions(+), 14 deletions(-) diff --git a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationConfiguration.java b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationConfiguration.java index 7143e1a..92537b2 100644 --- a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationConfiguration.java @@ -2,7 +2,6 @@ package de.juplo.kafka.wordcount.splitter; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsConfig; import org.springframework.boot.SpringApplication; import org.springframework.boot.context.properties.EnableConfigurationProperties; @@ -27,24 +26,13 @@ public class SplitterApplicationConfiguration @Bean(initMethod = "start", destroyMethod = "stop") public SplitterStreamProcessor streamProcessor( SplitterApplicationProperties properties, + Properties streamProcessorProperties, ConfigurableApplicationContext context) { - Properties propertyMap = new Properties(); - - propertyMap.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId()); - propertyMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer()); - propertyMap.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); - propertyMap.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); - propertyMap.put(JsonDeserializer.TRUSTED_PACKAGES, SplitterApplication.class.getName()); - propertyMap.put(JsonDeserializer.KEY_DEFAULT_TYPE, User.class.getName()); - propertyMap.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Recording.class.getName()); - propertyMap.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false); - propertyMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - SplitterStreamProcessor streamProcessor = new SplitterStreamProcessor( properties.getInputTopic(), properties.getOutputTopic(), - propertyMap); + streamProcessorProperties); streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) -> { @@ -59,4 +47,30 @@ public class SplitterApplicationConfiguration return streamProcessor; } + + @Bean + public Properties streamProcessorProperties( + SplitterApplicationProperties applicationProperties) + { + Properties properties = serializationConfig(); + + properties.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationProperties.getApplicationId()); + properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, applicationProperties.getBootstrapServer()); + properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + return properties; + } + + static Properties serializationConfig() + { + Properties properties = new Properties(); + + properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); + properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); + properties.put(JsonDeserializer.KEY_DEFAULT_TYPE, User.class.getName()); + properties.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Recording.class.getName()); + properties.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false); + + return properties; + } } -- 2.20.1 From 82a2e30072861bc8a3c19d51ebca158a3331b5d9 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Tue, 4 Jun 2024 23:31:05 +0200 Subject: [PATCH 15/16] splitter: 1.2.0 - Introduced `SplitterStreamProcessor.buildTopology(..)` --- .../splitter/SplitterStreamProcessor.java | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java index d0070c0..fa84665 100644 --- a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java @@ -3,6 +3,7 @@ package de.juplo.kafka.wordcount.splitter; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.KStream; import java.util.Arrays; @@ -22,6 +23,14 @@ public class SplitterStreamProcessor String inputTopic, String outputTopic, Properties properties) + { + Topology topology = buildTopology(inputTopic, outputTopic); + streams = new KafkaStreams(topology, properties); + } + + static Topology buildTopology( + String inputTopic, + String outputTopic) { StreamsBuilder builder = new StreamsBuilder(); @@ -34,7 +43,10 @@ public class SplitterStreamProcessor .toList()) .to(outputTopic); - streams = new KafkaStreams(builder.build(), properties); + Topology topology = builder.build(); + log.info("\n\n{}", topology.describe()); + + return topology; } public void start() -- 2.20.1 From 836368acb5e435a733df5893e477a406daeafcb3 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Mon, 3 Jun 2024 18:01:55 +0200 Subject: [PATCH 16/16] splitter: 1.2.0 - Added `SplitterStreamProcessorTopologyTest` --- .../SplitterStreamProcessorTopologyTest.java | 73 +++++++++++++++++++ 1 file changed, 73 insertions(+) create mode 100644 src/test/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessorTopologyTest.java diff --git a/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessorTopologyTest.java new file mode 100644 index 0000000..5e2d729 --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessorTopologyTest.java @@ -0,0 +1,73 @@ +package de.juplo.kafka.wordcount.splitter; + +import de.juplo.kafka.wordcount.counter.TestOutputUser; +import de.juplo.kafka.wordcount.counter.TestOutputWord; +import de.juplo.kafka.wordcount.recorder.TestInputRecording; +import de.juplo.kafka.wordcount.recorder.TestInputUser; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.streams.TestInputTopic; +import org.apache.kafka.streams.TestOutputTopic; +import org.apache.kafka.streams.TopologyTestDriver; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.kafka.support.serializer.JsonDeserializer; +import org.springframework.kafka.support.serializer.JsonSerializer; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; + +import static de.juplo.kafka.wordcount.splitter.SplitterApplicationConfiguration.serializationConfig; + + +@Slf4j +public class SplitterStreamProcessorTopologyTest +{ + public final static String IN = "TEST-IN"; + public final static String OUT = "TEST-OUT"; + + + TopologyTestDriver testDriver; + TestInputTopic in; + TestOutputTopic out; + + + @BeforeEach + public void setUp() + { + testDriver = new TopologyTestDriver( + SplitterStreamProcessor.buildTopology(IN, OUT), + serializationConfig()); + + in = testDriver.createInputTopic( + IN, + new JsonSerializer().noTypeInfo(), + new JsonSerializer().noTypeInfo()); + + out = testDriver.createOutputTopic( + OUT, + new JsonDeserializer().copyWithType(TestOutputUser.class), + new JsonDeserializer().copyWithType(TestOutputWord.class)); + } + + + @Test + public void test() + { + TestData + .getInputMessages() + .forEach(kv -> in.pipeInput(kv.key, kv.value)); + + MultiValueMap receivedMessages = new LinkedMultiValueMap<>(); + out + .readRecordsToList() + .forEach(record -> receivedMessages.add(record.key(), record.value())); + + TestData.assertExpectedMessages(receivedMessages); + } + + @AfterEach + public void tearDown() + { + testDriver.close(); + } +} -- 2.20.1