From f218e0680d909919bfc85952d7865b26818311ea Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Wed, 24 Jan 2024 22:35:39 +0100 Subject: [PATCH] WIP --- .../java/de/juplo/kafka/SimpleProducer.java | 8 +++- .../java/de/juplo/kafka/ApplicationTests.java | 45 +++---------------- 2 files changed, 12 insertions(+), 41 deletions(-) diff --git a/src/main/java/de/juplo/kafka/SimpleProducer.java b/src/main/java/de/juplo/kafka/SimpleProducer.java index 06d28ff..2b5b441 100644 --- a/src/main/java/de/juplo/kafka/SimpleProducer.java +++ b/src/main/java/de/juplo/kafka/SimpleProducer.java @@ -7,6 +7,7 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; +import java.util.concurrent.CompletableFuture; @Slf4j @@ -56,8 +57,9 @@ public class SimpleProducer } } - void send(String key, String value) + CompletableFuture send(String key, String value) { + final CompletableFuture result = new CompletableFuture<>(); final long time = System.currentTimeMillis(); final ProducerRecord record = new ProducerRecord<>( @@ -83,6 +85,7 @@ public class SimpleProducer metadata.timestamp(), now - time ); + result.complete(metadata.offset()); } else { @@ -95,6 +98,7 @@ public class SimpleProducer now - time, e.toString() ); + result.completeExceptionally(e); } }); @@ -106,6 +110,8 @@ public class SimpleProducer record.key(), now - time ); + + return result; } diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 9c94204..bfb665f 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -1,30 +1,18 @@ package de.juplo.kafka; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRecord; import org.junit.jupiter.api.*; import org.junit.jupiter.api.extension.ExtendWith; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; -import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.boot.test.context.TestConfiguration; -import org.springframework.context.annotation.Bean; -import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.test.context.junit.jupiter.SpringExtension; -import org.springframework.test.web.servlet.MockMvc; import java.time.Duration; -import java.util.LinkedList; -import java.util.List; +import java.util.concurrent.CompletableFuture; import static de.juplo.kafka.ApplicationTests.PARTITIONS; import static de.juplo.kafka.ApplicationTests.TOPIC; import static org.awaitility.Awaitility.*; -import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post; -import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.put; -import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; @ExtendWith(SpringExtension.class) @@ -50,34 +38,11 @@ public class ApplicationTests TOPIC, "B"); + CompletableFuture result; - mockMvc - .perform(post("/peter").content("Hallo Welt!")) - .andExpect(status().isOk()); - await("Message was send") + result = instanceA.send("a","message #1"); + await("Sending of 1. message for A is completed") .atMost(Duration.ofSeconds(5)) - .until(() -> consumer.received.size() == 1); - } - - @Test - void testSendFooMessage() throws Exception - { - mockMvc - .perform(put("/peter")) - .andExpect(status().isOk()); - await("Message was send") - .atMost(Duration.ofSeconds(5)) - .until(() -> consumer.received.size() == 1); - } - - @Test - void testSendGreeting() throws Exception - { - mockMvc - .perform(post("/").content("peter")) - .andExpect(status().isOk()); - await("Message was send") - .atMost(Duration.ofSeconds(5)) - .until(() -> consumer.received.size() == 1); + .until(() -> result.isDone()); } } -- 2.20.1