Merge der Upgrades für Confluent/Spring-Boot (Branch 'customized')
authorKai Moritz <kai@juplo.de>
Fri, 22 Jul 2022 19:53:48 +0000 (21:53 +0200)
committerKai Moritz <kai@juplo.de>
Fri, 22 Jul 2022 19:53:48 +0000 (21:53 +0200)
docker-compose.yml
pom.xml
src/main/java/de/juplo/kafka/RestProducer.java
src/main/resources/application.yml
src/test/java/de/juplo/kafka/ApplicationTests.java [new file with mode: 0644]

index b3a8b13..ad9276b 100644 (file)
@@ -1,14 +1,14 @@
 version: '3.2'
 services:
   zookeeper:
-    image: confluentinc/cp-zookeeper:7.0.2
+    image: confluentinc/cp-zookeeper:7.1.3
     environment:
       ZOOKEEPER_CLIENT_PORT: 2181
     ports:
       - 2181:2181
 
   kafka:
-    image: confluentinc/cp-kafka:7.0.2
+    image: confluentinc/cp-kafka:7.1.3
     environment:
       KAFKA_BROKER_ID: 1
       KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
diff --git a/pom.xml b/pom.xml
index 129ea94..e4d24bb 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -7,13 +7,14 @@
   <parent>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-parent</artifactId>
-    <version>2.6.5</version>
+    <version>2.7.2</version>
     <relativePath/> <!-- lookup parent from repository -->
   </parent>
 
   <groupId>de.juplo.kafka</groupId>
   <artifactId>rest-producer</artifactId>
-  <name>REST Producer: a Simple Producer that takes messages via POST and confirms successs</name>
+  <name>REST Producer</name>
+  <description>A Simple Producer that takes messages via POST and confirms successs</description>
   <version>1.0-SNAPSHOT</version>
 
   <dependencies>
       <artifactId>spring-boot-starter-test</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.springframework.kafka</groupId>
+      <artifactId>spring-kafka</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.springframework.kafka</groupId>
+      <artifactId>spring-kafka-test</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.awaitility</groupId>
+      <artifactId>awaitility</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>
       <plugin>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-maven-plugin</artifactId>
+        <executions>
+          <execution>
+            <goals>
+              <goal>build-info</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>pl.project13.maven</groupId>
+        <artifactId>git-commit-id-plugin</artifactId>
       </plugin>
       <plugin>
         <groupId>io.fabric8</groupId>
index 408cd2f..a040bb0 100644 (file)
@@ -4,6 +4,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.StringSerializer;
+import org.springframework.http.HttpStatus;
 import org.springframework.web.bind.annotation.*;
 import org.springframework.web.context.request.async.DeferredResult;
 
@@ -114,6 +115,13 @@ public class RestProducer
     return result;
   }
 
+  @ExceptionHandler
+  @ResponseStatus(HttpStatus.BAD_REQUEST)
+  public ErrorResponse illegalStateException(IllegalStateException e)
+  {
+    return new ErrorResponse(e.getMessage(), HttpStatus.BAD_REQUEST.value());
+  }
+
   @PreDestroy
   public void destroy() throws ExecutionException, InterruptedException
   {
index fcc0f3c..726204e 100644 (file)
@@ -1,17 +1,36 @@
 producer:
   bootstrap-server: :9092
-  client-id: peter
+  client-id: DEV
   topic: test
   acks: -1
   batch-size: 16384
   linger-ms: 0
   compression-type: gzip
 management:
+  endpoint:
+    shutdown:
+      enabled: true
   endpoints:
     web:
       exposure:
         include: "*"
+  info:
+    env:
+      enabled: true
+    java:
+      enabled: true
+info:
+  kafka:
+    bootstrap-server: ${producer.bootstrap-server}
+    client-id: ${producer.client-id}
+    topic: ${producer.topic}
+    acks: ${producer.acks}
+    batch-size: ${producer.batch-size}
+    linger-ms: ${producer.linger-ms}
+    compression-type: ${producer.compression-type}
 logging:
   level:
     root: INFO
     de.juplo: DEBUG
+server:
+  port: 8880
diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java
new file mode 100644 (file)
index 0000000..cf70c81
--- /dev/null
@@ -0,0 +1,86 @@
+package de.juplo.kafka;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.junit.jupiter.api.*;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Bean;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.test.context.EmbeddedKafka;
+import org.springframework.test.web.servlet.MockMvc;
+
+import java.time.Duration;
+import java.util.LinkedList;
+import java.util.List;
+
+import static de.juplo.kafka.ApplicationTests.PARTITIONS;
+import static de.juplo.kafka.ApplicationTests.TOPIC;
+import static org.awaitility.Awaitility.*;
+import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post;
+import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.put;
+import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
+
+
+@SpringBootTest(
+               properties = {
+                               "spring.kafka.consumer.bootstrap-servers=${spring.embedded.kafka.brokers}",
+                               "producer.bootstrap-server=${spring.embedded.kafka.brokers}",
+                               "producer.topic=" + TOPIC})
+@AutoConfigureMockMvc
+@EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)
+@Slf4j
+public class ApplicationTests
+{
+       static final String TOPIC = "FOO";
+       static final int PARTITIONS = 10;
+
+       @Autowired
+       MockMvc mockMvc;
+       @Autowired
+       Consumer consumer;
+
+
+       @BeforeEach
+       public void clear()
+       {
+               consumer.received.clear();
+       }
+
+
+       @Test
+       void testSendMessage() throws Exception
+       {
+               mockMvc
+                               .perform(post("/peter").content("Hallo Welt!"))
+                               .andExpect(status().isOk());
+               await("Message was send")
+                               .atMost(Duration.ofSeconds(5))
+                               .until(() -> consumer.received.size() == 1);
+       }
+
+
+       static class Consumer
+       {
+               final List<ConsumerRecord<String, String>> received = new LinkedList<>();
+
+               @KafkaListener(groupId = "TEST", topics = TOPIC)
+               public void receive(ConsumerRecord<String, String> record)
+               {
+                       log.debug("Received message: {}", record);
+                       received.add(record);
+               }
+       }
+
+       @TestConfiguration
+       static class Configuration
+       {
+               @Bean
+               Consumer consumer()
+               {
+                       return new Consumer();
+               }
+       }
+}