Merge der Upgrades für Confluent/Spring-Boot (Branch 'rest-producer')
authorKai Moritz <kai@juplo.de>
Fri, 22 Jul 2022 18:57:49 +0000 (20:57 +0200)
committerKai Moritz <kai@juplo.de>
Fri, 22 Jul 2022 18:57:49 +0000 (20:57 +0200)
12 files changed:
README.sh
docker-compose.yml
pom.xml
src/main/java/de/juplo/kafka/Application.java
src/main/java/de/juplo/kafka/ApplicationProperties.java
src/main/java/de/juplo/kafka/ClientMessage.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/FooMessage.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/Greeting.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/ProduceFailure.java
src/main/java/de/juplo/kafka/RestProducer.java
src/main/resources/application.yml
src/test/java/de/juplo/kafka/ApplicationTests.java

index 698d6dd..832a42f 100755 (executable)
--- a/README.sh
+++ b/README.sh
@@ -1,6 +1,6 @@
 #!/bin/bash
 
-IMAGE=juplo/rest-producer:1.0-SNAPSHOT
+IMAGE=juplo/springified-producer:1.0-SNAPSHOT
 
 if [ "$1" = "cleanup" ]
 then
@@ -29,7 +29,9 @@ docker-compose up -d
 
 sleep 15
 
-echo foo | http -v :8080/bar
+echo 'Hallo Welt!' | http -v :8080/peter
+echo peter | http -v :8080/
+http -v PUT :8080/peter
 dd if=/dev/zero bs=1024 count=1024  | http -v :8080/fehler
 http -v :8081/seen
 
index 1e8a8df..7321516 100644 (file)
@@ -37,13 +37,13 @@ services:
     command: sleep infinity
 
   producer:
-    image: juplo/rest-producer:1.0-SNAPSHOT
+    image: juplo/springified-producer:1.0-SNAPSHOT
     ports:
       - 8080:8880
     environment:
-      producer.bootstrap-server: kafka:9092
+      spring.kafka.bootstrap-servers: kafka:9092
       producer.client-id: producer
-      producer.topic: test
+      spring.kafka.template.default-topic: test
 
   consumer:
     image: juplo/endless-consumer:1.0-SNAPSHOT
diff --git a/pom.xml b/pom.xml
index e4d24bb..4c79bc8 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -12,9 +12,9 @@
   </parent>
 
   <groupId>de.juplo.kafka</groupId>
-  <artifactId>rest-producer</artifactId>
-  <name>REST Producer</name>
-  <description>A Simple Producer that takes messages via POST and confirms successs</description>
+  <artifactId>springified-producer</artifactId>
+  <name>Springified REST Producer</name>
+  <description>A Simple Producer that is implemented with the help of Spring Kafka and takes messages via POST and confirms successs</description>
   <version>1.0-SNAPSHOT</version>
 
   <dependencies>
@@ -32,8 +32,8 @@
       <optional>true</optional>
     </dependency>
     <dependency>
-      <groupId>org.apache.kafka</groupId>
-      <artifactId>kafka-clients</artifactId>
+      <groupId>org.springframework.kafka</groupId>
+      <artifactId>spring-kafka</artifactId>
     </dependency>
     <dependency>
       <groupId>org.projectlombok</groupId>
       <artifactId>spring-boot-starter-test</artifactId>
       <scope>test</scope>
     </dependency>
-    <dependency>
-      <groupId>org.springframework.kafka</groupId>
-      <artifactId>spring-kafka</artifactId>
-      <scope>test</scope>
-    </dependency>
     <dependency>
       <groupId>org.springframework.kafka</groupId>
       <artifactId>spring-kafka-test</artifactId>
index 9f3e3ed..273cee5 100644 (file)
@@ -1,13 +1,8 @@
 package de.juplo.kafka;
 
-import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.Bean;
-import org.springframework.util.Assert;
-
-import java.util.concurrent.Executors;
 
 
 @SpringBootApplication
index 1f30262..7b01334 100644 (file)
@@ -9,11 +9,6 @@ import org.springframework.boot.context.properties.ConfigurationProperties;
 @Setter
 public class ApplicationProperties
 {
-  private String bootstrapServer;
   private String clientId;
   private String topic;
-  private String acks;
-  private Integer batchSize;
-  private Integer lingerMs;
-  private String compressionType;
 }
diff --git a/src/main/java/de/juplo/kafka/ClientMessage.java b/src/main/java/de/juplo/kafka/ClientMessage.java
new file mode 100644 (file)
index 0000000..042fdc4
--- /dev/null
@@ -0,0 +1,11 @@
+package de.juplo.kafka;
+
+import lombok.Value;
+
+
+@Value
+public class ClientMessage
+{
+  private final String client;
+  private final String message;
+}
diff --git a/src/main/java/de/juplo/kafka/FooMessage.java b/src/main/java/de/juplo/kafka/FooMessage.java
new file mode 100644 (file)
index 0000000..2e9e8ba
--- /dev/null
@@ -0,0 +1,11 @@
+package de.juplo.kafka;
+
+import lombok.Value;
+
+
+@Value
+public class FooMessage
+{
+  private final String client;
+  private final Long timestamp;
+}
diff --git a/src/main/java/de/juplo/kafka/Greeting.java b/src/main/java/de/juplo/kafka/Greeting.java
new file mode 100644 (file)
index 0000000..2df15a3
--- /dev/null
@@ -0,0 +1,13 @@
+package de.juplo.kafka;
+
+import lombok.Value;
+
+import java.time.LocalDateTime;
+
+
+@Value
+public class Greeting
+{
+  private final String name;
+  private final LocalDateTime when = LocalDateTime.now();
+}
index 873a67b..7c78482 100644 (file)
@@ -12,7 +12,7 @@ public class ProduceFailure implements ProduceResult
   private final Integer status;
 
 
-  public ProduceFailure(Exception e)
+  public ProduceFailure(Throwable e)
   {
     status = 500;
     exception = e.getClass().getSimpleName();
index 7d9bf12..423a8a3 100644 (file)
@@ -1,18 +1,14 @@
 package de.juplo.kafka;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.clients.producer.RecordMetadata;
 import org.springframework.http.HttpStatus;
-import org.springframework.http.MediaType;
+import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.web.bind.annotation.*;
 import org.springframework.web.context.request.async.DeferredResult;
 
 import javax.annotation.PreDestroy;
-import java.util.Properties;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
 
 
 @Slf4j
@@ -20,86 +16,88 @@ import java.util.concurrent.ExecutorService;
 public class RestProducer
 {
   private final String id;
-  private final String topic;
-  private final KafkaProducer<String, String> producer;
+  private final KafkaTemplate<String, Object> kafkaTemplate;
 
   private long produced = 0;
 
-  public RestProducer(ApplicationProperties properties)
+  public RestProducer(
+      ApplicationProperties properties,
+      KafkaTemplate<String, Object> kafkaTemplate)
   {
     this.id = properties.getClientId();
-    this.topic = properties.getTopic();
-
-    Properties props = new Properties();
-    props.put("bootstrap.servers", properties.getBootstrapServer());
-    props.put("client.id", properties.getClientId());
-    props.put("acks", properties.getAcks());
-    props.put("batch.size", properties.getBatchSize());
-    props.put("delivery.timeout.ms", 20000); // 20 Sekunden
-    props.put("request.timeout.ms",  10000); // 10 Sekunden
-    props.put("linger.ms", properties.getLingerMs());
-    props.put("compression.type", properties.getCompressionType());
-    props.put("key.serializer", StringSerializer.class.getName());
-    props.put("value.serializer", StringSerializer.class.getName());
-
-    this.producer = new KafkaProducer<>(props);
+    this.kafkaTemplate = kafkaTemplate;
   }
 
   @PostMapping(path = "{key}")
-  public DeferredResult<ProduceResult> send(
+  public DeferredResult<ProduceResult> message(
       @PathVariable String key,
       @RequestBody String value)
+  {
+    key = key.trim();
+    return send(key, new ClientMessage(key, value));
+  }
+
+  @PutMapping(path = "{key}")
+  public DeferredResult<ProduceResult> message(@PathVariable String key)
+  {
+    key = key.trim();
+    return send(key, new FooMessage(key, System.currentTimeMillis()));
+  }
+
+  @PostMapping(path = "/")
+  public DeferredResult<ProduceResult> greeting(
+      @RequestBody String name)
+  {
+    name = name.trim();
+    return send(name, new Greeting(name));
+  }
+
+  private DeferredResult<ProduceResult> send(String key, Object value)
   {
     DeferredResult<ProduceResult> result = new DeferredResult<>();
 
     final long time = System.currentTimeMillis();
 
-    final ProducerRecord<String, String> record = new ProducerRecord<>(
-        topic,  // Topic
-        key,    // Key
-        value   // Value
-    );
-
-    producer.send(record, (metadata, e) ->
-    {
-      long now = System.currentTimeMillis();
-      if (e == null)
+    kafkaTemplate.sendDefault(key, value).addCallback(
+      (sendResult) ->
       {
+        long now = System.currentTimeMillis();
+
         // HANDLE SUCCESS
+        RecordMetadata metadata = sendResult.getRecordMetadata();
         produced++;
         result.setResult(new ProduceSuccess(metadata.partition(), metadata.offset()));
         log.debug(
             "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms",
             id,
-            record.key(),
-            record.value(),
+            key,
+            value,
             metadata.partition(),
             metadata.offset(),
             metadata.timestamp(),
             now - time
         );
-      }
-      else
+      },
+      (e) ->
       {
+        long now = System.currentTimeMillis();
+
         // HANDLE ERROR
         result.setErrorResult(new ProduceFailure(e));
         log.error(
-            "{} - ERROR key={} timestamp={} latency={}ms: {}",
+            "{} - ERROR key={} timestamp=-1 latency={}ms: {}",
             id,
-            record.key(),
-            metadata == null ? -1 : metadata.timestamp(),
+            key,
             now - time,
             e.toString()
         );
-      }
-    });
+      });
 
     long now = System.currentTimeMillis();
     log.trace(
-        "{} - Queued #{} key={} latency={}ms",
+        "{} - Queued key={} latency={}ms",
         id,
-        value,
-        record.key(),
+        key,
         now - time
     );
 
@@ -117,8 +115,6 @@ public class RestProducer
   public void destroy() throws ExecutionException, InterruptedException
   {
     log.info("{} - Destroy!", id);
-    log.info("{} - Closing the KafkaProducer", id);
-    producer.close();
     log.info("{}: Produced {} messages in total, exiting!", id, produced);
   }
 }
index 726204e..ce26258 100644 (file)
@@ -1,11 +1,6 @@
 producer:
-  bootstrap-server: :9092
   client-id: DEV
   topic: test
-  acks: -1
-  batch-size: 16384
-  linger-ms: 0
-  compression-type: gzip
 management:
   endpoint:
     shutdown:
@@ -21,13 +16,32 @@ management:
       enabled: true
 info:
   kafka:
-    bootstrap-server: ${producer.bootstrap-server}
+    bootstrap-servers: ${spring.kafka.bootstrap-servers}
     client-id: ${producer.client-id}
-    topic: ${producer.topic}
-    acks: ${producer.acks}
-    batch-size: ${producer.batch-size}
-    linger-ms: ${producer.linger-ms}
-    compression-type: ${producer.compression-type}
+    topic: ${spring.kafka.template.default-topic}
+    acks: ${spring.kafka.producer.acks}
+    batch-size: ${spring.kafka.producer.batch-size}
+    linger-ms: ${spring.kafka.producer.properties.linger.ms}
+    compression-type: ${spring.kafka.producer.compression-type}
+spring:
+  kafka:
+    bootstrap-servers: :9092
+    producer:
+      acks: -1
+      batch-size: 16384
+      compression-type: gzip
+      key-serializer: org.apache.kafka.common.serialization.StringSerializer
+      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
+      properties:
+        linger.ms: 0
+        delivery.timeout.ms: 20000 # 20 Sekunden
+        request.timeout.ms: 10000 # 10 Sekunden
+        spring.json.type.mapping: >
+          message:de.juplo.kafka.ClientMessage,
+          foo:de.juplo.kafka.FooMessage,
+          greeting:de.juplo.kafka.Greeting
+    template:
+      default-topic: test
 logging:
   level:
     root: INFO
index cf70c81..0a11e44 100644 (file)
@@ -26,9 +26,8 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.
 
 @SpringBootTest(
                properties = {
-                               "spring.kafka.consumer.bootstrap-servers=${spring.embedded.kafka.brokers}",
-                               "producer.bootstrap-server=${spring.embedded.kafka.brokers}",
-                               "producer.topic=" + TOPIC})
+                               "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
+                               "spring.kafka.template.default-topic=" + TOPIC})
 @AutoConfigureMockMvc
 @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)
 @Slf4j
@@ -51,7 +50,7 @@ public class ApplicationTests
 
 
        @Test
-       void testSendMessage() throws Exception
+       void testSendClientMessage() throws Exception
        {
                mockMvc
                                .perform(post("/peter").content("Hallo Welt!"))
@@ -61,6 +60,28 @@ public class ApplicationTests
                                .until(() -> consumer.received.size() == 1);
        }
 
+       @Test
+       void testSendFooMessage() throws Exception
+       {
+               mockMvc
+                               .perform(put("/peter"))
+                               .andExpect(status().isOk());
+               await("Message was send")
+                               .atMost(Duration.ofSeconds(5))
+                               .until(() -> consumer.received.size() == 1);
+       }
+
+       @Test
+       void testSendGreeting() throws Exception
+       {
+               mockMvc
+                               .perform(post("/").content("peter"))
+                               .andExpect(status().isOk());
+               await("Message was send")
+                               .atMost(Duration.ofSeconds(5))
+                               .until(() -> consumer.received.size() == 1);
+       }
+
 
        static class Consumer
        {