From: Kai Moritz Date: Sun, 26 Jun 2022 08:54:39 +0000 (+0200) Subject: splitter: 1.0.0-vanilla-kafka - Fixed the test "Context Loads" X-Git-Tag: wip-test-vanilla-kafka~4 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=7a3d748f77f7eb2754dd1db63d087ebdd8b8a3e2;p=demos%2Fkafka%2Fwordcount splitter: 1.0.0-vanilla-kafka - Fixed the test "Context Loads" * Added `spring-kafka-test` as dependency. * Added `@EmbeddedKafka` to the integration test. * Configured the test to support transactions (replication factor == 1). * Fixed the `SplitterStreamProcessor`: the application logic is executed in a background thread, because it blocks the startup of Spring Boot otherwise. --- diff --git a/pom.xml b/pom.xml index cc61fcf..f413864 100644 --- a/pom.xml +++ b/pom.xml @@ -56,6 +56,11 @@ spring-boot-starter-test test + + org.springframework.kafka + spring-kafka-test + test + diff --git a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java index 51d22b3..191428c 100644 --- a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java @@ -9,8 +9,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; -import org.springframework.boot.ApplicationArguments; -import org.springframework.boot.ApplicationRunner; +import org.springframework.core.task.TaskExecutor; import org.springframework.stereotype.Component; import org.springframework.util.Assert; @@ -18,8 +17,6 @@ import javax.annotation.PreDestroy; import java.time.Clock; import java.time.Duration; import java.util.*; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Semaphore; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.regex.Pattern; @@ -28,7 +25,7 @@ import java.util.stream.Collectors; @Component @Slf4j -public class SplitterStreamProcessor implements ApplicationRunner +public class SplitterStreamProcessor implements Runnable { final static Pattern PATTERN = Pattern.compile("\\W+"); @@ -47,7 +44,8 @@ public class SplitterStreamProcessor implements ApplicationRunner public SplitterStreamProcessor( SplitterApplicationProperties properties, - Clock clock) + Clock clock, + TaskExecutor executor) { this.inputTopic = properties.getInputTopic(); this.outputTopic = properties.getOutputTopic(); @@ -74,10 +72,11 @@ public class SplitterStreamProcessor implements ApplicationRunner props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); producer = new KafkaProducer<>(props); + + executor.execute(this); } - @Override - public void run(ApplicationArguments args) + public void run() { running.lock(); diff --git a/src/test/java/de/juplo/kafka/wordcount/splitter/ApplicationTests.java b/src/test/java/de/juplo/kafka/wordcount/splitter/ApplicationTests.java index 77266a9..f02762b 100644 --- a/src/test/java/de/juplo/kafka/wordcount/splitter/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/wordcount/splitter/ApplicationTests.java @@ -1,13 +1,37 @@ package de.juplo.kafka.wordcount.splitter; import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.kafka.test.context.EmbeddedKafka; -@SpringBootTest +import static de.juplo.kafka.wordcount.splitter.ApplicationTests.TOPIC_IN; +import static de.juplo.kafka.wordcount.splitter.ApplicationTests.TOPIC_OUT; + + +@SpringBootTest( + properties = { + "juplo.wordcount.splitter.input-topic=" + TOPIC_IN, + "juplo.wordcount.splitter.outtput-topic=" + TOPIC_OUT, + }) +@EmbeddedKafka( + bootstrapServersProperty = "juplo.wordcount.splitter.bootstrap-server", + topics = { TOPIC_IN, TOPIC_OUT }, + brokerProperties = { + "transaction.state.log.replication.factor=1", + "transaction.state.log.min.isr=1", + }) class ApplicationTests { + final static String TOPIC_IN = "in"; + final static String TOPIC_OUT = "out"; + + @Autowired + SplitterStreamProcessor splitter; + @Test void contextLoads() { + splitter.stop(); } }