package de.juplo.kafka;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Bean;
+import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.test.context.EmbeddedKafka;
+import org.springframework.test.web.servlet.MockMvc;
+import java.time.Duration;
+import java.util.LinkedList;
+import java.util.List;
+
+import static de.juplo.kafka.ApplicationTests.PARTITIONS;
import static de.juplo.kafka.ApplicationTests.TOPIC;
+import static org.awaitility.Awaitility.await;
+import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
+import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath;
+import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
@SpringBootTest(
properties = {
"spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
- "spring.kafka.template.default-topic=" + TOPIC
+ "spring.kafka.consumer.auto-offset-reset=earliest",
})
-@EmbeddedKafka(topics = TOPIC)
+@AutoConfigureMockMvc
+@EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)
+@Slf4j
public class ApplicationTests
{
- public final static String TOPIC = "out";
+ static final String TOPIC = "FOO";
+ static final int PARTITIONS = 10;
+
+ @Autowired
+ MockMvc mockMvc;
+ @Autowired
+ Consumer consumer;
+
+
+ @BeforeEach
+ public void clear()
+ {
+ consumer.received.clear();
+ }
+
@Test
public void testApplicationStartup()
{
+ await("Application is healthy")
+ .atMost(Duration.ofSeconds(5))
+ .untilAsserted(() -> mockMvc
+ .perform(get("/actuator/health"))
+ .andExpect(status().isOk())
+ .andExpect(jsonPath("status").value("UP")));
+ }
+
+ @Test
+ public void testSendMessage() throws Exception
+ {
+ await("Some messages were send")
+ .atMost(Duration.ofSeconds(5))
+ .until(() -> consumer.received.size() >= 1);
+ }
+
+
+ static class Consumer
+ {
+ final List<ConsumerRecord<String, String>> received = new LinkedList<>();
+
+ @KafkaListener(groupId = "TEST", topics = TOPIC)
+ public void receive(ConsumerRecord<String, String> record)
+ {
+ log.debug("Received message: {}", record);
+ received.add(record);
+ }
+ }
+
+ @TestConfiguration
+ static class Configuration
+ {
+ @Bean
+ Consumer consumer()
+ {
+ return new Consumer();
+ }
}
}