TODO: So klappt es zwar, aber der Test führt main() nicht aus... spring/supersimple-producer--todo
authorKai Moritz <kai@juplo.de>
Thu, 23 Jan 2025 18:48:46 +0000 (19:48 +0100)
committerKai Moritz <kai@juplo.de>
Thu, 23 Jan 2025 18:48:46 +0000 (19:48 +0100)
build.gradle
pom.xml
src/main/java/de/juplo/kafka/ExampleProducer.java
src/main/resources/application.yml
src/test/java/de/juplo/kafka/ApplicationTests.java

index 7bbe870..7dcc4f2 100644 (file)
@@ -27,7 +27,8 @@ repositories {
 }
 
 dependencies {
-       implementation 'org.springframework.boot:spring-boot-starter'
+       implementation 'org.springframework.boot:spring-boot-starter-web'
+       implementation 'org.springframework.boot:spring-boot-starter-actuator'
        implementation 'org.springframework.kafka:spring-kafka'
        compileOnly 'org.projectlombok:lombok'
        developmentOnly 'org.springframework.boot:spring-boot-devtools'
diff --git a/pom.xml b/pom.xml
index 1f08dfe..ac03bdc 100644 (file)
--- a/pom.xml
+++ b/pom.xml
   <dependencies>
     <dependency>
       <groupId>org.springframework.boot</groupId>
-      <artifactId>spring-boot-starter</artifactId>
+      <artifactId>spring-boot-starter-web</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.springframework.boot</groupId>
+      <artifactId>spring-boot-starter-actuator</artifactId>
     </dependency>
     <dependency>
       <groupId>org.springframework.boot</groupId>
index 3084633..a967254 100644 (file)
@@ -39,7 +39,7 @@ public class ExampleProducer
           result.getRecordMetadata().offset()));
 
       completableFuture.exceptionally(e -> {
-        log.error("ERROR sendig message", e);
+        log.error("ERROR sending message", e);
         return null;
       });
       // end::callback[]
index 51cc46c..ab6c4be 100644 (file)
@@ -3,3 +3,16 @@ spring:
     bootstrap-servers: :9092
     template:
       default-topic: test
+management:
+  endpoint:
+    shutdown:
+      enabled: true
+  endpoints:
+    web:
+      exposure:
+        include: "*"
+  info:
+    env:
+      enabled: true
+    java:
+      enabled: true
index 714175e..3e56a84 100644 (file)
@@ -1,24 +1,95 @@
 package de.juplo.kafka;
 
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
 import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Bean;
+import org.springframework.kafka.annotation.KafkaListener;
 import org.springframework.kafka.test.context.EmbeddedKafka;
+import org.springframework.test.web.servlet.MockMvc;
 
+import java.time.Duration;
+import java.util.LinkedList;
+import java.util.List;
+
+import static de.juplo.kafka.ApplicationTests.PARTITIONS;
 import static de.juplo.kafka.ApplicationTests.TOPIC;
+import static org.awaitility.Awaitility.await;
+import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
+import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath;
+import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
 
 
 @SpringBootTest(
     properties = {
         "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
-        "spring.kafka.template.default-topic=" + TOPIC
+        "spring.kafka.consumer.auto-offset-reset=earliest",
     })
-@EmbeddedKafka(topics = TOPIC)
+@AutoConfigureMockMvc
+@EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)
+@Slf4j
 public class ApplicationTests
 {
-  public final static String TOPIC = "out";
+  static final String TOPIC = "FOO";
+  static final int PARTITIONS = 10;
+
+  @Autowired
+  MockMvc mockMvc;
+  @Autowired
+  Consumer consumer;
+
+
+  @BeforeEach
+  public void clear()
+  {
+    consumer.received.clear();
+  }
+
 
   @Test
   public void testApplicationStartup()
   {
+    await("Application is healthy")
+      .atMost(Duration.ofSeconds(5))
+      .untilAsserted(() -> mockMvc
+        .perform(get("/actuator/health"))
+        .andExpect(status().isOk())
+        .andExpect(jsonPath("status").value("UP")));
+  }
+
+  @Test
+  public void testSendMessage() throws Exception
+  {
+    await("Some messages were send")
+        .atMost(Duration.ofSeconds(5))
+        .until(() -> consumer.received.size() >= 1);
+  }
+
+
+  static class Consumer
+  {
+    final List<ConsumerRecord<String, String>> received = new LinkedList<>();
+
+    @KafkaListener(groupId = "TEST", topics = TOPIC)
+    public void receive(ConsumerRecord<String, String> record)
+    {
+      log.debug("Received message: {}", record);
+      received.add(record);
+    }
+  }
+
+  @TestConfiguration
+  static class Configuration
+  {
+    @Bean
+    Consumer consumer()
+    {
+      return new Consumer();
+    }
   }
 }