From: Kai Moritz <kai@juplo.de>
Date: Thu, 30 Jun 2022 17:06:09 +0000 (+0200)
Subject: splitter: Implemented a test that proofs the splitting process
X-Git-Tag: wip-integration-xml~10
X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=refs%2Fheads%2Fsplitter-vanilla-kafka;p=demos%2Fkafka%2Fwordcount

splitter: Implemented a test that proofs the splitting process
---

diff --git a/pom.xml b/pom.xml
index f413864..1938fd9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -56,11 +56,26 @@
 			<artifactId>spring-boot-starter-test</artifactId>
 			<scope>test</scope>
 		</dependency>
+		<dependency>
+			<groupId>org.springframework.kafka</groupId>
+			<artifactId>spring-kafka</artifactId>
+			<scope>test</scope>
+		</dependency>
 		<dependency>
 			<groupId>org.springframework.kafka</groupId>
 			<artifactId>spring-kafka-test</artifactId>
 			<scope>test</scope>
 		</dependency>
+		<dependency>
+			<groupId>org.assertj</groupId>
+			<artifactId>assertj-core</artifactId>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.awaitility</groupId>
+			<artifactId>awaitility</artifactId>
+			<scope>test</scope>
+		</dependency>
 	</dependencies>
 
 	<build>
diff --git a/src/test/java/de/juplo/kafka/wordcount/splitter/ApplicationTests.java b/src/test/java/de/juplo/kafka/wordcount/splitter/ApplicationTests.java
index 5e79b87..775d3bd 100644
--- a/src/test/java/de/juplo/kafka/wordcount/splitter/ApplicationTests.java
+++ b/src/test/java/de/juplo/kafka/wordcount/splitter/ApplicationTests.java
@@ -1,12 +1,24 @@
 package de.juplo.kafka.wordcount.splitter;
 
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Bean;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.kafka.test.context.EmbeddedKafka;
 
+import java.time.Duration;
+import java.util.LinkedList;
+import java.util.List;
+
 import static de.juplo.kafka.wordcount.splitter.ApplicationTests.TOPIC_IN;
 import static de.juplo.kafka.wordcount.splitter.ApplicationTests.TOPIC_OUT;
+import static org.assertj.core.api.Assertions.assertThat;
 
 
 @SpringBootTest(
@@ -14,6 +26,7 @@ import static de.juplo.kafka.wordcount.splitter.ApplicationTests.TOPIC_OUT;
 				"juplo.wordcount.splitter.bootstrap-server=${spring.embedded.kafka.brokers}",
 				"juplo.wordcount.splitter.input-topic=" + TOPIC_IN,
 				"juplo.wordcount.splitter.output-topic=" + TOPIC_OUT,
+				"spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
 		})
 @EmbeddedKafka(
 		topics = { TOPIC_IN, TOPIC_OUT },
@@ -28,10 +41,67 @@ class ApplicationTests
 
 	@Autowired
 	SplitterStreamProcessor splitter;
+	@Autowired
+	KafkaTemplate<String, String> kafkaTemplate;
+	@Autowired
+	Listener listener;
+
+	@BeforeEach
+	public void clear()
+	{
+		listener.keys.clear();
+		listener.words.clear();
+	}
+
 
 	@Test
 	void contextLoads()
 	{
 		splitter.stop();
 	}
+
+	@Test
+	void split()
+	{
+		kafkaTemplate.send(TOPIC_IN, "beate", "Hello World!");
+
+		Awaitility
+				.await("Receive two words")
+				.atMost(Duration.ofSeconds(5))
+				.untilAsserted(() ->
+						assertThat(listener.words.size())
+								.describedAs("Received two words")
+								.isEqualTo(2));
+
+		assertThat(listener.keys)
+				.describedAs("Received unexpected keys")
+				.containsExactly("beate", "beate");
+		assertThat(listener.words)
+				.describedAs("Received unexpected words")
+				.containsExactly("Hello", "World");
+	}
+
+
+	static class Listener
+	{
+		final List<String> keys = new LinkedList<>();
+		final List<String> words = new LinkedList<>();
+
+		@KafkaListener(groupId = "peter", topics = TOPIC_OUT)
+		public void receive(ConsumerRecord<String, String> record)
+		{
+			keys.add(record.key());
+			words.add(record.value());
+		}
+	}
+
+	@TestConfiguration
+	static class Configuration
+	{
+		@Bean
+		Listener listener()
+		{
+			return new Listener();
+		}
+	}
 }