From c1defc35245b4427f80d8eb0fda71e5212b38a80 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Wed, 24 Jan 2024 22:05:09 +0100 Subject: [PATCH] WIP --- .../java/de/juplo/kafka/ApplicationTests.java | 83 +++++++++++++++++++ 1 file changed, 83 insertions(+) create mode 100644 src/test/java/de/juplo/kafka/ApplicationTests.java diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java new file mode 100644 index 0000000..9c94204 --- /dev/null +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -0,0 +1,83 @@ +package de.juplo.kafka; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.junit.jupiter.api.*; +import org.junit.jupiter.api.extension.ExtendWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.test.context.junit.jupiter.SpringExtension; +import org.springframework.test.web.servlet.MockMvc; + +import java.time.Duration; +import java.util.LinkedList; +import java.util.List; + +import static de.juplo.kafka.ApplicationTests.PARTITIONS; +import static de.juplo.kafka.ApplicationTests.TOPIC; +import static org.awaitility.Awaitility.*; +import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post; +import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.put; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; + + +@ExtendWith(SpringExtension.class) +@EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS) +@Slf4j +public class ApplicationTests +{ + static final String TOPIC = "FOO"; + static final int PARTITIONS = 10; + + @Value("${spring.embedded.kafka.brokers}") + String bootstrapServers; + + @Test + void testConcurrentProducers() throws Exception + { + SimpleProducer instanceA = new SimpleProducer( + bootstrapServers, + TOPIC, + "A"); + SimpleProducer instanceB = new SimpleProducer( + bootstrapServers, + TOPIC, + "B"); + + + mockMvc + .perform(post("/peter").content("Hallo Welt!")) + .andExpect(status().isOk()); + await("Message was send") + .atMost(Duration.ofSeconds(5)) + .until(() -> consumer.received.size() == 1); + } + + @Test + void testSendFooMessage() throws Exception + { + mockMvc + .perform(put("/peter")) + .andExpect(status().isOk()); + await("Message was send") + .atMost(Duration.ofSeconds(5)) + .until(() -> consumer.received.size() == 1); + } + + @Test + void testSendGreeting() throws Exception + { + mockMvc + .perform(post("/").content("peter")) + .andExpect(status().isOk()); + await("Message was send") + .atMost(Duration.ofSeconds(5)) + .until(() -> consumer.received.size() == 1); + } +} -- 2.20.1