+++ /dev/null
-package de.juplo.kafka;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.junit.jupiter.api.*;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
-import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.boot.test.context.TestConfiguration;
-import org.springframework.context.annotation.Bean;
-import org.springframework.kafka.annotation.KafkaListener;
-import org.springframework.kafka.test.context.EmbeddedKafka;
-import org.springframework.test.web.servlet.MockMvc;
-
-import java.time.Duration;
-import java.util.LinkedList;
-import java.util.List;
-
-import static de.juplo.kafka.ApplicationTests.PARTITIONS;
-import static de.juplo.kafka.ApplicationTests.TOPIC;
-import static org.awaitility.Awaitility.*;
-import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post;
-import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
-
-
-@SpringBootTest(
- properties = {
- "spring.kafka.consumer.bootstrap-servers=${spring.embedded.kafka.brokers}",
- "producer.bootstrap-server=${spring.embedded.kafka.brokers}",
- "producer.topic=" + TOPIC})
-@AutoConfigureMockMvc
-@EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)
-@Slf4j
-public class ApplicationTests
-{
- static final String TOPIC = "FOO";
- static final int PARTITIONS = 10;
-
- @Autowired
- MockMvc mockMvc;
- @Autowired
- Consumer consumer;
-
-
- @BeforeEach
- public void clear()
- {
- consumer.received.clear();
- }
-
-
- @Test
- void testSendMessage() throws Exception
- {
- mockMvc
- .perform(post("/peter").content("Hallo Welt!"))
- .andExpect(status().isOk());
- await("Message was send")
- .atMost(Duration.ofSeconds(5))
- .until(() -> consumer.received.size() == 1);
- }
-
-
- static class Consumer
- {
- final List<ConsumerRecord<String, String>> received = new LinkedList<>();
-
- @KafkaListener(groupId = "TEST", topics = TOPIC)
- public void receive(ConsumerRecord<String, String> record)
- {
- log.debug("Received message: {}", record);
- received.add(record);
- }
- }
-
- @TestConfiguration
- static class Configuration
- {
- @Bean
- Consumer consumer()
- {
- return new Consumer();
- }
- }
-}