From: Kai Moritz Date: Sun, 26 May 2024 20:47:45 +0000 (+0200) Subject: WIP X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=refs%2Fheads%2Fsplitter;hp=520528638e7487c845d7fb0f39066ce7d249297b;p=demos%2Fkafka%2Fwordcount WIP --- diff --git a/Dockerfile b/Dockerfile index 803477f..dd18a47 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM openjdk:11-jre-slim +FROM eclipse-temurin:17-jre COPY target/*.jar /opt/app.jar EXPOSE 8086 ENTRYPOINT ["java", "-jar", "/opt/app.jar"] diff --git a/pom.xml b/pom.xml index 4018e22..fea07a7 100644 --- a/pom.xml +++ b/pom.xml @@ -5,16 +5,16 @@ org.springframework.boot spring-boot-starter-parent - 3.0.2 + 3.2.5 de.juplo.kafka.wordcount splitter - 1.1.1 + 1.1.6 Wordcount-Splitter Stream-processor of the multi-user wordcount-example, that splits the sentences up into single words - 0.33.0 + 0.44.0 @@ -75,6 +75,9 @@ + + maven-failsafe-plugin + org.springframework.boot spring-boot-maven-plugin diff --git a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java index 491c549..5b9d91d 100644 --- a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java +++ b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java @@ -2,11 +2,9 @@ package de.juplo.kafka.wordcount.splitter; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.boot.context.properties.EnableConfigurationProperties; @SpringBootApplication -@EnableConfigurationProperties(SplitterApplicationProperties.class) public class SplitterApplication { public static void main(String[] args) diff --git a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationConfiguration.java b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationConfiguration.java new file mode 100644 index 0000000..ead41f8 --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationConfiguration.java @@ -0,0 +1,61 @@ +package de.juplo.kafka.wordcount.splitter; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.StreamsConfig; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.support.serializer.JsonDeserializer; +import org.springframework.kafka.support.serializer.JsonSerde; +import org.springframework.kafka.support.serializer.JsonSerializer; + +import java.util.Properties; +import java.util.concurrent.CompletableFuture; + +import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; + + +@Configuration +@EnableConfigurationProperties(SplitterApplicationProperties.class) +@Slf4j +public class SplitterApplicationConfiguration +{ + @Bean(initMethod = "start", destroyMethod = "stop") + public SplitterStreamProcessor streamProcessor( + SplitterApplicationProperties properties, + ConfigurableApplicationContext context) + { + Properties propertyMap = new Properties(); + + propertyMap.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId()); + propertyMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer()); + propertyMap.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName()); + propertyMap.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); + propertyMap.put(JsonDeserializer.TRUSTED_PACKAGES, SplitterApplication.class.getName()); + propertyMap.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Recording.class.getName()); + propertyMap.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false); + propertyMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + SplitterStreamProcessor streamProcessor = new SplitterStreamProcessor( + properties.getInputTopic(), + properties.getOutputTopic(), + propertyMap); + + streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) -> + { + log.error("Unexpected error!", e); + CompletableFuture.runAsync(() -> + { + log.info("Stopping application..."); + SpringApplication.exit(context, () -> 1); + }); + return SHUTDOWN_CLIENT; + }); + + return streamProcessor; + } +} diff --git a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java index 66188a7..60c569b 100644 --- a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java @@ -1,31 +1,16 @@ package de.juplo.kafka.wordcount.splitter; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; -import org.springframework.boot.SpringApplication; -import org.springframework.context.ConfigurableApplicationContext; -import org.springframework.kafka.support.serializer.JsonDeserializer; -import org.springframework.kafka.support.serializer.JsonSerde; -import org.springframework.kafka.support.serializer.JsonSerializer; -import org.springframework.stereotype.Component; -import jakarta.annotation.PostConstruct; -import jakarta.annotation.PreDestroy; import java.util.Arrays; import java.util.Properties; -import java.util.concurrent.CompletableFuture; import java.util.regex.Pattern; -import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; - @Slf4j -@Component public class SplitterStreamProcessor { final static Pattern PATTERN = Pattern.compile("[^\\p{IsAlphabetic}]+"); @@ -34,51 +19,30 @@ public class SplitterStreamProcessor public SplitterStreamProcessor( - SplitterApplicationProperties properties, - ConfigurableApplicationContext context) + String inputTopic, + String outputTopic, + Properties properties) { StreamsBuilder builder = new StreamsBuilder(); - KStream source = builder.stream(properties.getInputTopic()); + KStream source = builder.stream(inputTopic); source .flatMapValues(recording -> Arrays .stream(PATTERN.split(recording.getSentence())) .map(word -> Word.of(recording.getUser(), word)) .toList()) - .to(properties.getOutputTopic()); - - Properties props = new Properties(); - props.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId()); - props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer()); - props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName()); - props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); - props.put(JsonDeserializer.TRUSTED_PACKAGES, Recording.class.getName() ); - props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Recording.class.getName()); - props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false); - props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + .to(outputTopic); - streams = new KafkaStreams(builder.build(), props); - streams.setUncaughtExceptionHandler((Throwable e) -> - { - log.error("Unexpected error!", e); - CompletableFuture.runAsync(() -> - { - log.info("Stopping application..."); - SpringApplication.exit(context, () -> 1); - }); - return SHUTDOWN_CLIENT; - }); + streams = new KafkaStreams(builder.build(), properties); } - @PostConstruct public void start() { log.info("Starting Stream-Processor"); streams.start(); } - @PreDestroy public void stop() { log.info("Stopping Stream-Processor"); diff --git a/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationIT.java new file mode 100644 index 0000000..ce8f506 --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationIT.java @@ -0,0 +1,172 @@ +package de.juplo.kafka.wordcount.splitter; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.streams.KeyValue; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.messaging.handler.annotation.Payload; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; + +import java.time.Duration; +import java.util.function.BiConsumer; +import java.util.stream.Stream; + +import static de.juplo.kafka.wordcount.splitter.SplitterApplicationIT.*; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + + +@SpringBootTest( + properties = { + "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer", + "spring.kafka.producer.properties.spring.json.add.type.headers=false ", + "spring.kafka.consumer.auto-offset-reset=earliest", + "spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer", + "spring.kafka.consumer.properties.spring.json.value.default.type=de.juplo.kafka.wordcount.splitter.WordData", + "spring.kafka.consumer.properties.spring.json.trusted.packages=de.juplo.kafka.wordcount.splitter", + "logging.level.root=WARN", + "logging.level.de.juplo=DEBUG", + "juplo.wordcount.splitter.bootstrap-server=${spring.embedded.kafka.brokers}", + "juplo.wordcount.splitter.input-topic=" + TOPIC_IN, + "juplo.wordcount.splitter.output-topic=" + TOPIC_OUT }) +@EmbeddedKafka(topics = { TOPIC_IN, TOPIC_OUT }, partitions = PARTITIONS) +@Slf4j +public class SplitterApplicationIT +{ + public final static String TOPIC_IN = "in"; + public final static String TOPIC_OUT = "out"; + static final int PARTITIONS = 2; + + @Autowired + KafkaTemplate kafkaTemplate; + @Autowired + Consumer consumer; + + + @BeforeEach + public void clear() + { + consumer.received.clear(); + } + + static void writeInputData(BiConsumer consumer) + { + Recording recording; + + recording = new Recording(); + recording.setUser("peter"); + recording.setSentence("Hallo Welt!"); + consumer.accept(recording.getUser(), recording); + + recording = new Recording(); + recording.setUser("klaus"); + recording.setSentence("Müsch gäb's auch!"); + consumer.accept(recording.getUser(), recording); + + recording = new Recording(); + recording.setUser("peter"); + recording.setSentence("Boäh, echt! ß mal nä Nümmäh!"); + consumer.accept(recording.getUser(), recording); + } + + @Test + void testSendMessage() throws Exception + { + writeInputData((user, recording) -> kafkaTemplate.send(TOPIC_IN, user, recording)); + + await("Expexted converted data") + .atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> + assertExpectedResult(consumer.getReceivedMessages())); + } + + + static void assertExpectedResult(MultiValueMap receivedMessages) + { + MultiValueMap expected = new LinkedMultiValueMap<>(); + expectedMessages.forEach(keyValue -> expected.add(keyValue.key, keyValue.value)); + await("Received expected messages") + .atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> expected.forEach((user, word) -> + assertThat(receivedMessages.get(user)).containsExactlyElementsOf(word))); + } + + static Stream> expectedMessages = Stream.of( + KeyValue.pair( + "peter", + new WordData("peter", "Hallo")), + KeyValue.pair( + "peter", + new WordData("peter", "Welt")), + KeyValue.pair( + "klaus", + new WordData("klaus","Müsch")), + KeyValue.pair( + "klaus", + new WordData("klaus","gäb")), + KeyValue.pair( + "klaus", + new WordData("klaus","s")), + KeyValue.pair( + "klaus", + new WordData("klaus","auch")), + KeyValue.pair( + "peter", + new WordData("peter", "Boäh")), + KeyValue.pair( + "peter", + new WordData("peter", "echt")), + KeyValue.pair( + "peter", + new WordData("peter", "ß")), + KeyValue.pair( + "peter", + new WordData("peter", "mal")), + KeyValue.pair( + "peter", + new WordData("peter", "nä")), + KeyValue.pair( + "peter", + new WordData("peter", "Nümmäh"))); + + + static class Consumer + { + private final MultiValueMap received = new LinkedMultiValueMap<>(); + + @KafkaListener(groupId = "TEST", topics = TOPIC_OUT) + public synchronized void receive( + @Header(KafkaHeaders.RECEIVED_KEY) String key, + @Payload WordData value) + { + log.debug("Received message: {}={}", key, value); + received.add(key, value); + } + + synchronized MultiValueMap getReceivedMessages() + { + return received; + } + } + + + @TestConfiguration + static class Configuration + { + @Bean + Consumer consumer() + { + return new Consumer(); + } + } +} diff --git a/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationTests.java b/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationTests.java deleted file mode 100644 index 3fa5851..0000000 --- a/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationTests.java +++ /dev/null @@ -1,114 +0,0 @@ -package de.juplo.kafka.wordcount.splitter; - -import com.fasterxml.jackson.databind.ObjectMapper; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.boot.test.context.TestConfiguration; -import org.springframework.context.annotation.Bean; -import org.springframework.kafka.annotation.KafkaListener; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.kafka.test.context.EmbeddedKafka; -import org.springframework.util.LinkedMultiValueMap; -import org.springframework.util.MultiValueMap; - -import java.time.Duration; - -import static de.juplo.kafka.wordcount.splitter.SplitterApplicationTests.*; -import static org.assertj.core.api.Assertions.assertThat; -import static org.awaitility.Awaitility.*; - - -@SpringBootTest( - properties = { - "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", - "juplo.wordcount.splitter.bootstrap-server=${spring.embedded.kafka.brokers}", - "juplo.wordcount.splitter.input-topic=" + TOPIC_IN, - "juplo.wordcount.splitter.output-topic=" + TOPIC_OUT }) -@EmbeddedKafka(topics = { TOPIC_IN, TOPIC_OUT }, partitions = PARTITIONS) -@Slf4j -public class SplitterApplicationTests -{ - public final static String TOPIC_IN = "in"; - public final static String TOPIC_OUT = "out"; - static final int PARTITIONS = 2; - - @Autowired - KafkaTemplate kafkaTemplate; - @Autowired - ObjectMapper mapper; - @Autowired - Consumer consumer; - - - @BeforeEach - public void clear() - { - consumer.received.clear(); - } - - - @Test - void testSendMessage() throws Exception - { - Recording recording = new Recording(); - recording.setUser("peter"); - recording.setSentence("Hallo Welt!"); - kafkaTemplate.send(TOPIC_IN, recording.getUser(), mapper.writeValueAsString(recording)); - recording.setUser("klaus"); - recording.setSentence("Müsch gäb's auch!"); - kafkaTemplate.send(TOPIC_IN, recording.getUser(), mapper.writeValueAsString(recording)); - recording.setUser("peter"); - recording.setSentence("Boäh, echt! ß mal nä Nümmäh!"); - kafkaTemplate.send(TOPIC_IN, recording.getUser(), mapper.writeValueAsString(recording)); - - String peter1 = mapper.writeValueAsString(Word.of("peter", "Hallo")); - String peter2 = mapper.writeValueAsString(Word.of("peter", "Welt")); - String peter3 = mapper.writeValueAsString(Word.of("peter", "Boäh")); - String peter4 = mapper.writeValueAsString(Word.of("peter", "echt")); - String peter5 = mapper.writeValueAsString(Word.of("peter", "ß")); - String peter6 = mapper.writeValueAsString(Word.of("peter", "mal")); - String peter7 = mapper.writeValueAsString(Word.of("peter", "nä")); - String peter8 = mapper.writeValueAsString(Word.of("peter", "Nümmäh")); - - String klaus1 = mapper.writeValueAsString(Word.of("klaus","Müsch")); - String klaus2 = mapper.writeValueAsString(Word.of("klaus","gäb")); - String klaus3 = mapper.writeValueAsString(Word.of("klaus","s")); - String klaus4 = mapper.writeValueAsString(Word.of("klaus","auch")); - - await("Expexted converted data") - .atMost(Duration.ofSeconds(5)) - .untilAsserted(() -> - { - assertThat(consumer.received).hasSize(2); - assertThat(consumer.received.get("klaus")).containsExactly(klaus1, klaus2, klaus3, klaus4); - assertThat(consumer.received.get("peter")).containsExactly(peter1, peter2, peter3, peter4, peter5, peter6, peter7, peter8); - }); - } - - - static class Consumer - { - final MultiValueMap received = new LinkedMultiValueMap<>(); - - @KafkaListener(groupId = "TEST", topics = TOPIC_OUT) - public void receive(ConsumerRecord record) - { - log.debug("Received message: {}", record); - received.add(record.key(), record.value()); - } - } - - @TestConfiguration - static class Configuration - { - @Bean - Consumer consumer() - { - return new Consumer(); - } - } -} diff --git a/src/test/java/de/juplo/kafka/wordcount/splitter/WordData.java b/src/test/java/de/juplo/kafka/wordcount/splitter/WordData.java new file mode 100644 index 0000000..7ae2c8b --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/splitter/WordData.java @@ -0,0 +1,15 @@ +package de.juplo.kafka.wordcount.splitter; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + + +@Data +@NoArgsConstructor +@AllArgsConstructor +public class WordData +{ + String user; + String word; +}