<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.springframework.kafka</groupId>
+ <artifactId>spring-kafka</artifactId>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.assertj</groupId>
+ <artifactId>assertj-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
package de.juplo.kafka.wordcount.splitter;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Bean;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.test.context.EmbeddedKafka;
+import java.time.Duration;
+import java.util.LinkedList;
+import java.util.List;
+
import static de.juplo.kafka.wordcount.splitter.ApplicationTests.TOPIC_IN;
import static de.juplo.kafka.wordcount.splitter.ApplicationTests.TOPIC_OUT;
+import static org.assertj.core.api.Assertions.assertThat;
@SpringBootTest(
"juplo.wordcount.splitter.bootstrap-server=${spring.embedded.kafka.brokers}",
"juplo.wordcount.splitter.input-topic=" + TOPIC_IN,
"juplo.wordcount.splitter.output-topic=" + TOPIC_OUT,
+ "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
})
@EmbeddedKafka(
topics = { TOPIC_IN, TOPIC_OUT },
@Autowired
SplitterStreamProcessor splitter;
+ @Autowired
+ KafkaTemplate<String, String> kafkaTemplate;
+ @Autowired
+ Listener listener;
+
+ @BeforeEach
+ public void clear()
+ {
+ listener.keys.clear();
+ listener.words.clear();
+ }
+
@Test
void contextLoads()
{
splitter.stop();
}
+
+ @Test
+ void split()
+ {
+ kafkaTemplate.send(TOPIC_IN, "beate", "Hello World!");
+
+ Awaitility
+ .await("Receive two words")
+ .atMost(Duration.ofSeconds(5))
+ .untilAsserted(() ->
+ assertThat(listener.words.size())
+ .describedAs("Received two words")
+ .isEqualTo(2));
+
+ assertThat(listener.keys)
+ .describedAs("Received unexpected keys")
+ .containsExactly("beate", "beate");
+ assertThat(listener.words)
+ .describedAs("Received unexpected words")
+ .containsExactly("Hello", "World");
+ }
+
+
+ static class Listener
+ {
+ final List<String> keys = new LinkedList<>();
+ final List<String> words = new LinkedList<>();
+
+ @KafkaListener(groupId = "peter", topics = TOPIC_OUT)
+ public void receive(ConsumerRecord<String, String> record)
+ {
+ keys.add(record.key());
+ words.add(record.value());
+ }
+ }
+
+ @TestConfiguration
+ static class Configuration
+ {
+ @Bean
+ Listener listener()
+ {
+ return new Listener();
+ }
+ }
}