From 723ec6bbc03aaa4f552fd3a27986c120ea46fccf Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 26 Jun 2022 10:54:39 +0200 Subject: [PATCH] splitter: 1.0.0-vanilla-kafka - Fixed the test "Context Loads" * Added `spring-kafka-test` as dependency. * Added `@EmbeddedKafka` to the integration test. * Configured the test to support transactions (replication factor == 1). * Fixed the `SplitterStreamProcessor`: the application logic is executed in a background thread, because it blocks the startup of Spring Boot otherwise. --- pom.xml | 5 ++++ .../splitter/SplitterStreamProcessor.java | 13 +++++----- .../wordcount/splitter/ApplicationTests.java | 26 ++++++++++++++++++- 3 files changed, 37 insertions(+), 7 deletions(-) diff --git a/pom.xml b/pom.xml index cc61fcf..f413864 100644 --- a/pom.xml +++ b/pom.xml @@ -56,6 +56,11 @@ spring-boot-starter-test test + + org.springframework.kafka + spring-kafka-test + test + diff --git a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java index 791e164..191428c 100644 --- a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java @@ -9,8 +9,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; -import org.springframework.boot.ApplicationArguments; -import org.springframework.boot.ApplicationRunner; +import org.springframework.core.task.TaskExecutor; import org.springframework.stereotype.Component; import org.springframework.util.Assert; @@ -26,7 +25,7 @@ import java.util.stream.Collectors; @Component @Slf4j -public class SplitterStreamProcessor implements ApplicationRunner +public class SplitterStreamProcessor implements Runnable { final static Pattern PATTERN = Pattern.compile("\\W+"); @@ -45,7 +44,8 @@ public class SplitterStreamProcessor implements ApplicationRunner public SplitterStreamProcessor( SplitterApplicationProperties properties, - Clock clock) + Clock clock, + TaskExecutor executor) { this.inputTopic = properties.getInputTopic(); this.outputTopic = properties.getOutputTopic(); @@ -72,10 +72,11 @@ public class SplitterStreamProcessor implements ApplicationRunner props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); producer = new KafkaProducer<>(props); + + executor.execute(this); } - @Override - public void run(ApplicationArguments args) + public void run() { running.lock(); diff --git a/src/test/java/de/juplo/kafka/wordcount/splitter/ApplicationTests.java b/src/test/java/de/juplo/kafka/wordcount/splitter/ApplicationTests.java index 77266a9..5e79b87 100644 --- a/src/test/java/de/juplo/kafka/wordcount/splitter/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/wordcount/splitter/ApplicationTests.java @@ -1,13 +1,37 @@ package de.juplo.kafka.wordcount.splitter; import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.kafka.test.context.EmbeddedKafka; -@SpringBootTest +import static de.juplo.kafka.wordcount.splitter.ApplicationTests.TOPIC_IN; +import static de.juplo.kafka.wordcount.splitter.ApplicationTests.TOPIC_OUT; + + +@SpringBootTest( + properties = { + "juplo.wordcount.splitter.bootstrap-server=${spring.embedded.kafka.brokers}", + "juplo.wordcount.splitter.input-topic=" + TOPIC_IN, + "juplo.wordcount.splitter.output-topic=" + TOPIC_OUT, + }) +@EmbeddedKafka( + topics = { TOPIC_IN, TOPIC_OUT }, + brokerProperties = { + "transaction.state.log.replication.factor=1", + "transaction.state.log.min.isr=1", + }) class ApplicationTests { + final static String TOPIC_IN = "in"; + final static String TOPIC_OUT = "out"; + + @Autowired + SplitterStreamProcessor splitter; + @Test void contextLoads() { + splitter.stop(); } } -- 2.20.1