splitter: Implemented a test that proofs the splitting process splitter-vanilla-kafka
authorKai Moritz <kai@juplo.de>
Thu, 30 Jun 2022 17:06:09 +0000 (19:06 +0200)
committerKai Moritz <kai@juplo.de>
Thu, 30 Jun 2022 18:38:23 +0000 (20:38 +0200)
pom.xml
src/test/java/de/juplo/kafka/wordcount/splitter/ApplicationTests.java

diff --git a/pom.xml b/pom.xml
index f413864..1938fd9 100644 (file)
--- a/pom.xml
+++ b/pom.xml
                        <artifactId>spring-boot-starter-test</artifactId>
                        <scope>test</scope>
                </dependency>
+               <dependency>
+                       <groupId>org.springframework.kafka</groupId>
+                       <artifactId>spring-kafka</artifactId>
+                       <scope>test</scope>
+               </dependency>
                <dependency>
                        <groupId>org.springframework.kafka</groupId>
                        <artifactId>spring-kafka-test</artifactId>
                        <scope>test</scope>
                </dependency>
+               <dependency>
+                       <groupId>org.assertj</groupId>
+                       <artifactId>assertj-core</artifactId>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.awaitility</groupId>
+                       <artifactId>awaitility</artifactId>
+                       <scope>test</scope>
+               </dependency>
        </dependencies>
 
        <build>
index 5e79b87..775d3bd 100644 (file)
@@ -1,12 +1,24 @@
 package de.juplo.kafka.wordcount.splitter;
 
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Bean;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.kafka.test.context.EmbeddedKafka;
 
+import java.time.Duration;
+import java.util.LinkedList;
+import java.util.List;
+
 import static de.juplo.kafka.wordcount.splitter.ApplicationTests.TOPIC_IN;
 import static de.juplo.kafka.wordcount.splitter.ApplicationTests.TOPIC_OUT;
+import static org.assertj.core.api.Assertions.assertThat;
 
 
 @SpringBootTest(
@@ -14,6 +26,7 @@ import static de.juplo.kafka.wordcount.splitter.ApplicationTests.TOPIC_OUT;
                                "juplo.wordcount.splitter.bootstrap-server=${spring.embedded.kafka.brokers}",
                                "juplo.wordcount.splitter.input-topic=" + TOPIC_IN,
                                "juplo.wordcount.splitter.output-topic=" + TOPIC_OUT,
+                               "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
                })
 @EmbeddedKafka(
                topics = { TOPIC_IN, TOPIC_OUT },
@@ -28,10 +41,67 @@ class ApplicationTests
 
        @Autowired
        SplitterStreamProcessor splitter;
+       @Autowired
+       KafkaTemplate<String, String> kafkaTemplate;
+       @Autowired
+       Listener listener;
+
+       @BeforeEach
+       public void clear()
+       {
+               listener.keys.clear();
+               listener.words.clear();
+       }
+
 
        @Test
        void contextLoads()
        {
                splitter.stop();
        }
+
+       @Test
+       void split()
+       {
+               kafkaTemplate.send(TOPIC_IN, "beate", "Hello World!");
+
+               Awaitility
+                               .await("Receive two words")
+                               .atMost(Duration.ofSeconds(5))
+                               .untilAsserted(() ->
+                                               assertThat(listener.words.size())
+                                                               .describedAs("Received two words")
+                                                               .isEqualTo(2));
+
+               assertThat(listener.keys)
+                               .describedAs("Received unexpected keys")
+                               .containsExactly("beate", "beate");
+               assertThat(listener.words)
+                               .describedAs("Received unexpected words")
+                               .containsExactly("Hello", "World");
+       }
+
+
+       static class Listener
+       {
+               final List<String> keys = new LinkedList<>();
+               final List<String> words = new LinkedList<>();
+
+               @KafkaListener(groupId = "peter", topics = TOPIC_OUT)
+               public void receive(ConsumerRecord<String, String> record)
+               {
+                       keys.add(record.key());
+                       words.add(record.value());
+               }
+       }
+
+       @TestConfiguration
+       static class Configuration
+       {
+               @Bean
+               Listener listener()
+               {
+                       return new Listener();
+               }
+       }
 }