From cbabb637d0332ef3d8c67faf63e6e6897a69977f Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 4 Feb 2023 11:13:50 +0100 Subject: [PATCH] splitter: 1.1.0 - Input/Output is expected/read as JSON - The input is mapped to the class `Recording` - The output is mapped as class `Word` - Added a test, that asserts, that the expected input can be read and the intended output is written. --- pom.xml | 22 ++++- .../kafka/wordcount/splitter/Recording.java | 13 +++ .../splitter/SplitterStreamProcessor.java | 22 ++++- .../juplo/kafka/wordcount/splitter/Word.java | 11 +++ .../splitter/SplitterApplicationTests.java | 97 +++++++++++++++++++ 5 files changed, 160 insertions(+), 5 deletions(-) create mode 100644 src/main/java/de/juplo/kafka/wordcount/splitter/Recording.java create mode 100644 src/main/java/de/juplo/kafka/wordcount/splitter/Word.java create mode 100644 src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationTests.java diff --git a/pom.xml b/pom.xml index 8c1ea6f..abe3a67 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ de.juplo.kafka.wordcount splitter - 1.0.1 + 1.1.0 Wordcount-Splitter Stream-processor of the multi-user wordcount-example, that splits the sentences up into single words @@ -29,6 +29,10 @@ org.apache.kafka kafka-streams + + org.springframework.kafka + spring-kafka + org.springframework.boot @@ -46,11 +50,27 @@ lombok true + org.springframework.boot spring-boot-starter-test test + + org.springframework.kafka + spring-kafka-test + test + + + org.awaitility + awaitility + test + + + org.assertj + assertj-core + test + diff --git a/src/main/java/de/juplo/kafka/wordcount/splitter/Recording.java b/src/main/java/de/juplo/kafka/wordcount/splitter/Recording.java new file mode 100644 index 0000000..19adb0d --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/splitter/Recording.java @@ -0,0 +1,13 @@ +package de.juplo.kafka.wordcount.splitter; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.Data; + + +@Data +@JsonIgnoreProperties(ignoreUnknown = true) +public class Recording +{ + private String user; + private String sentence; +} diff --git a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java index 3eca8f4..12816ab 100644 --- a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java @@ -6,9 +6,12 @@ import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Produced; import org.springframework.boot.SpringApplication; import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.kafka.support.serializer.JsonSerde; import org.springframework.stereotype.Component; import jakarta.annotation.PostConstruct; @@ -36,16 +39,27 @@ public class SplitterStreamProcessor { StreamsBuilder builder = new StreamsBuilder(); - KStream source = builder.stream(properties.getInputTopic()); + JsonSerde recordSerde = + new JsonSerde<>(Recording.class).ignoreTypeHeaders(); + JsonSerde wordSerde = + new JsonSerde<>(Word.class).noTypeInfo(); + + KStream source = builder.stream( + properties.getInputTopic(), + Consumed.with(Serdes.String(), recordSerde)); + source - .flatMapValues(sentence -> Arrays.asList(PATTERN.split(sentence))) - .to(properties.getOutputTopic()); + .flatMapValues(recording -> Arrays + .stream(PATTERN.split(recording.getSentence())) + .map(word -> Word.of(recording.getUser(), word)) + .toList()) + .to(properties.getOutputTopic(), Produced.with(Serdes.String(), wordSerde)); Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId()); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer()); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); streams = new KafkaStreams(builder.build(), props); diff --git a/src/main/java/de/juplo/kafka/wordcount/splitter/Word.java b/src/main/java/de/juplo/kafka/wordcount/splitter/Word.java new file mode 100644 index 0000000..94eb539 --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/splitter/Word.java @@ -0,0 +1,11 @@ +package de.juplo.kafka.wordcount.splitter; + +import lombok.Value; + + +@Value(staticConstructor = "of") +public class Word +{ + String user; + String word; +} diff --git a/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationTests.java b/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationTests.java new file mode 100644 index 0000000..9b1960f --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationTests.java @@ -0,0 +1,97 @@ +package de.juplo.kafka.wordcount.splitter; + +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.test.context.EmbeddedKafka; + +import java.time.Duration; +import java.util.LinkedList; +import java.util.List; + +import static de.juplo.kafka.wordcount.splitter.SplitterApplicationTests.*; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.*; + + +@SpringBootTest( + properties = { + "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", + "juplo.wordcount.splitter.bootstrap-server=${spring.embedded.kafka.brokers}", + "juplo.wordcount.splitter.input-topic=" + TOPIC_IN, + "juplo.wordcount.splitter.output-topic=" + TOPIC_OUT }) +@EmbeddedKafka(topics = { TOPIC_IN, TOPIC_OUT }, partitions = PARTITIONS) +@Slf4j +public class SplitterApplicationTests +{ + public final static String TOPIC_IN = "in"; + public final static String TOPIC_OUT = "out"; + static final int PARTITIONS = 2; + + @Autowired + KafkaTemplate kafkaTemplate; + @Autowired + ObjectMapper mapper; + @Autowired + Consumer consumer; + + + @BeforeEach + public void clear() + { + consumer.received.clear(); + } + + + @Test + void testSendMessage() throws Exception + { + Recording recording = new Recording(); + recording.setUser("peter"); + recording.setSentence("Hallo Welt!"); + kafkaTemplate.send(TOPIC_IN, mapper.writeValueAsString(recording)); + + String word1 = mapper.writeValueAsString(Word.of("peter", "Hallo")); + String word2 = mapper.writeValueAsString(Word.of("peter", "Welt")); + + await("Expexted converted data") + .atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> + { + assertThat(consumer.received).hasSize(2); + assertThat(consumer.received.get(0).value()).isEqualTo(word1); + assertThat(consumer.received.get(1).value()).isEqualTo(word2); + }); + } + + + static class Consumer + { + final List> received = new LinkedList<>(); + + @KafkaListener(groupId = "TEST", topics = TOPIC_OUT) + public void receive(ConsumerRecord record) + { + log.debug("Received message: {}", record); + received.add(record); + } + } + + @TestConfiguration + static class Configuration + { + @Bean + Consumer consumer() + { + return new Consumer(); + } + } +} -- 2.20.1