X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationTests.java;fp=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationTests.java;h=cf70c81c8719ca46dab819d53b729a5c87642b7f;hb=330b7238f03cdfafd72f9980ce0f019c46b02afe;hp=0000000000000000000000000000000000000000;hpb=d7b66a40c2f72926c2c4b02e04eb97749898e16e;p=demos%2Fkafka%2Ftraining diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java new file mode 100644 index 0000000..cf70c81 --- /dev/null +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -0,0 +1,86 @@ +package de.juplo.kafka; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.junit.jupiter.api.*; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.test.web.servlet.MockMvc; + +import java.time.Duration; +import java.util.LinkedList; +import java.util.List; + +import static de.juplo.kafka.ApplicationTests.PARTITIONS; +import static de.juplo.kafka.ApplicationTests.TOPIC; +import static org.awaitility.Awaitility.*; +import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post; +import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.put; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; + + +@SpringBootTest( + properties = { + "spring.kafka.consumer.bootstrap-servers=${spring.embedded.kafka.brokers}", + "producer.bootstrap-server=${spring.embedded.kafka.brokers}", + "producer.topic=" + TOPIC}) +@AutoConfigureMockMvc +@EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS) +@Slf4j +public class ApplicationTests +{ + static final String TOPIC = "FOO"; + static final int PARTITIONS = 10; + + @Autowired + MockMvc mockMvc; + @Autowired + Consumer consumer; + + + @BeforeEach + public void clear() + { + consumer.received.clear(); + } + + + @Test + void testSendMessage() throws Exception + { + mockMvc + .perform(post("/peter").content("Hallo Welt!")) + .andExpect(status().isOk()); + await("Message was send") + .atMost(Duration.ofSeconds(5)) + .until(() -> consumer.received.size() == 1); + } + + + static class Consumer + { + final List> received = new LinkedList<>(); + + @KafkaListener(groupId = "TEST", topics = TOPIC) + public void receive(ConsumerRecord record) + { + log.debug("Received message: {}", record); + received.add(record); + } + } + + @TestConfiguration + static class Configuration + { + @Bean + Consumer consumer() + { + return new Consumer(); + } + } +}