splitter: 1.0.0-vanilla-kafka - Fixed the test "Context Loads"
authorKai Moritz <kai@juplo.de>
Sun, 26 Jun 2022 08:54:39 +0000 (10:54 +0200)
committerKai Moritz <kai@juplo.de>
Thu, 30 Jun 2022 18:13:36 +0000 (20:13 +0200)
* Added `spring-kafka-test` as dependency.
* Added `@EmbeddedKafka` to the integration test.
* Configured the test to support transactions (replication factor == 1).
* Fixed the `SplitterStreamProcessor`: the application logic is executed
  in a background thread, because it blocks the startup of Spring Boot
  otherwise.

pom.xml
src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java
src/test/java/de/juplo/kafka/wordcount/splitter/ApplicationTests.java

diff --git a/pom.xml b/pom.xml
index cc61fcf..f413864 100644 (file)
--- a/pom.xml
+++ b/pom.xml
                        <artifactId>spring-boot-starter-test</artifactId>
                        <scope>test</scope>
                </dependency>
+               <dependency>
+                       <groupId>org.springframework.kafka</groupId>
+                       <artifactId>spring-kafka-test</artifactId>
+                       <scope>test</scope>
+               </dependency>
        </dependencies>
 
        <build>
index 791e164..191428c 100644 (file)
@@ -9,8 +9,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
-import org.springframework.boot.ApplicationArguments;
-import org.springframework.boot.ApplicationRunner;
+import org.springframework.core.task.TaskExecutor;
 import org.springframework.stereotype.Component;
 import org.springframework.util.Assert;
 
@@ -26,7 +25,7 @@ import java.util.stream.Collectors;
 
 @Component
 @Slf4j
-public class SplitterStreamProcessor implements ApplicationRunner
+public class SplitterStreamProcessor implements Runnable
 {
   final static Pattern PATTERN = Pattern.compile("\\W+");
 
@@ -45,7 +44,8 @@ public class SplitterStreamProcessor implements ApplicationRunner
 
   public SplitterStreamProcessor(
       SplitterApplicationProperties properties,
-      Clock clock)
+      Clock clock,
+      TaskExecutor executor)
   {
     this.inputTopic = properties.getInputTopic();
     this.outputTopic = properties.getOutputTopic();
@@ -72,10 +72,11 @@ public class SplitterStreamProcessor implements ApplicationRunner
     props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
     props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
     producer = new KafkaProducer<>(props);
+
+    executor.execute(this);
   }
 
-  @Override
-  public void run(ApplicationArguments args)
+  public void run()
   {
     running.lock();
 
index 77266a9..5e79b87 100644 (file)
@@ -1,13 +1,37 @@
 package de.juplo.kafka.wordcount.splitter;
 
 import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.kafka.test.context.EmbeddedKafka;
 
-@SpringBootTest
+import static de.juplo.kafka.wordcount.splitter.ApplicationTests.TOPIC_IN;
+import static de.juplo.kafka.wordcount.splitter.ApplicationTests.TOPIC_OUT;
+
+
+@SpringBootTest(
+               properties = {
+                               "juplo.wordcount.splitter.bootstrap-server=${spring.embedded.kafka.brokers}",
+                               "juplo.wordcount.splitter.input-topic=" + TOPIC_IN,
+                               "juplo.wordcount.splitter.output-topic=" + TOPIC_OUT,
+               })
+@EmbeddedKafka(
+               topics = { TOPIC_IN, TOPIC_OUT },
+               brokerProperties = {
+                               "transaction.state.log.replication.factor=1",
+                               "transaction.state.log.min.isr=1",
+               })
 class ApplicationTests
 {
+       final static String TOPIC_IN = "in";
+       final static String TOPIC_OUT = "out";
+
+       @Autowired
+       SplitterStreamProcessor splitter;
+
        @Test
        void contextLoads()
        {
+               splitter.stop();
        }
 }