Für das Versenden wird das `KafkaTemplate` von Spring verwendet
authorKai Moritz <kai@juplo.de>
Sun, 12 Jun 2022 08:43:50 +0000 (10:43 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 12 Jun 2022 13:47:35 +0000 (15:47 +0200)
* Dabei wird weiterhin ein `ProducerRecord` erzeugt und dem
  `KafkaTemplate` übergeben, um die Änderungen möglichst übersichtlich
  zu halten.
* Die zuvor über `ApplicationProperties` einstellbaren Parameter wurden
  in der `application.yml` entsprechend in dem `spring.kafka.*`-Namespace
  gesetzt.
* Eintragung in den per Actuator veröffentlichten Infos angepasst.

docker-compose.yml
src/main/java/de/juplo/kafka/ApplicationProperties.java
src/main/java/de/juplo/kafka/ProduceFailure.java
src/main/java/de/juplo/kafka/RestProducer.java
src/main/resources/application.yml
src/test/java/de/juplo/kafka/ApplicationTests.java

index a03d211..287168c 100644 (file)
@@ -41,7 +41,7 @@ services:
     ports:
       - 8080:8880
     environment:
-      producer.bootstrap-server: kafka:9092
+      spring.kafka.bootstrap-servers: kafka:9092
       producer.client-id: producer
       producer.topic: test
 
index 1f30262..7b01334 100644 (file)
@@ -9,11 +9,6 @@ import org.springframework.boot.context.properties.ConfigurationProperties;
 @Setter
 public class ApplicationProperties
 {
-  private String bootstrapServer;
   private String clientId;
   private String topic;
-  private String acks;
-  private Integer batchSize;
-  private Integer lingerMs;
-  private String compressionType;
 }
index 873a67b..7c78482 100644 (file)
@@ -12,7 +12,7 @@ public class ProduceFailure implements ProduceResult
   private final Integer status;
 
 
-  public ProduceFailure(Exception e)
+  public ProduceFailure(Throwable e)
   {
     status = 500;
     exception = e.getClass().getSimpleName();
index 70b327a..56f3382 100644 (file)
@@ -1,16 +1,14 @@
 package de.juplo.kafka;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.clients.producer.RecordMetadata;
 import org.springframework.http.HttpStatus;
-import org.springframework.kafka.support.serializer.JsonSerializer;
+import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.web.bind.annotation.*;
 import org.springframework.web.context.request.async.DeferredResult;
 
 import javax.annotation.PreDestroy;
-import java.util.Properties;
 import java.util.concurrent.ExecutionException;
 
 
@@ -20,32 +18,17 @@ public class RestProducer
 {
   private final String id;
   private final String topic;
-  private final KafkaProducer<String, Object> producer;
+  private final KafkaTemplate<String, Object> kafkaTemplate;
 
   private long produced = 0;
 
-  public RestProducer(ApplicationProperties properties)
+  public RestProducer(
+      ApplicationProperties properties,
+      KafkaTemplate<String, Object> kafkaTemplate)
   {
     this.id = properties.getClientId();
     this.topic = properties.getTopic();
-
-    Properties props = new Properties();
-    props.put("bootstrap.servers", properties.getBootstrapServer());
-    props.put("client.id", properties.getClientId());
-    props.put("acks", properties.getAcks());
-    props.put("batch.size", properties.getBatchSize());
-    props.put("delivery.timeout.ms", 20000); // 20 Sekunden
-    props.put("request.timeout.ms",  10000); // 10 Sekunden
-    props.put("linger.ms", properties.getLingerMs());
-    props.put("compression.type", properties.getCompressionType());
-    props.put("key.serializer", StringSerializer.class.getName());
-    props.put("value.serializer", JsonSerializer.class.getName());
-    props.put(JsonSerializer.TYPE_MAPPINGS,
-        "message:" + ClientMessage.class.getName() + "," +
-        "foo:" + FooMessage.class.getName() + "," +
-        "greeting:" + Greeting.class.getName());
-
-    this.producer = new KafkaProducer<>(props);
+    this.kafkaTemplate = kafkaTemplate;
   }
 
   @PostMapping(path = "{key}")
@@ -96,12 +79,13 @@ public class RestProducer
 
     final long time = System.currentTimeMillis();
 
-    producer.send(record, (metadata, e) ->
-    {
-      long now = System.currentTimeMillis();
-      if (e == null)
+    kafkaTemplate.send(record).addCallback(
+      (sendResult) ->
       {
+        long now = System.currentTimeMillis();
+
         // HANDLE SUCCESS
+        RecordMetadata metadata = sendResult.getRecordMetadata();
         produced++;
         result.setResult(new ProduceSuccess(metadata.partition(), metadata.offset()));
         log.debug(
@@ -114,21 +98,21 @@ public class RestProducer
             metadata.timestamp(),
             now - time
         );
-      }
-      else
+      },
+      (e) ->
       {
+        long now = System.currentTimeMillis();
+
         // HANDLE ERROR
         result.setErrorResult(new ProduceFailure(e));
         log.error(
-            "{} - ERROR key={} timestamp={} latency={}ms: {}",
+            "{} - ERROR key={} timestamp=-1 latency={}ms: {}",
             id,
             record.key(),
-            metadata == null ? -1 : metadata.timestamp(),
             now - time,
             e.toString()
         );
-      }
-    });
+      });
 
     long now = System.currentTimeMillis();
     log.trace(
@@ -152,8 +136,6 @@ public class RestProducer
   public void destroy() throws ExecutionException, InterruptedException
   {
     log.info("{} - Destroy!", id);
-    log.info("{} - Closing the KafkaProducer", id);
-    producer.close();
     log.info("{}: Produced {} messages in total, exiting!", id, produced);
   }
 }
index 726204e..f54576a 100644 (file)
@@ -1,11 +1,6 @@
 producer:
-  bootstrap-server: :9092
   client-id: DEV
   topic: test
-  acks: -1
-  batch-size: 16384
-  linger-ms: 0
-  compression-type: gzip
 management:
   endpoint:
     shutdown:
@@ -21,13 +16,30 @@ management:
       enabled: true
 info:
   kafka:
-    bootstrap-server: ${producer.bootstrap-server}
+    bootstrap-servers: ${spring.kafka.bootstrap-servers}
     client-id: ${producer.client-id}
     topic: ${producer.topic}
-    acks: ${producer.acks}
-    batch-size: ${producer.batch-size}
-    linger-ms: ${producer.linger-ms}
-    compression-type: ${producer.compression-type}
+    acks: ${spring.kafka.producer.acks}
+    batch-size: ${spring.kafka.producer.batch-size}
+    linger-ms: ${spring.kafka.producer.properties.linger.ms}
+    compression-type: ${spring.kafka.producer.compression-type}
+spring:
+  kafka:
+    bootstrap-servers: :9092
+    producer:
+      acks: -1
+      batch-size: 16384
+      compression-type: gzip
+      key-serializer: org.apache.kafka.common.serialization.StringSerializer
+      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
+      properties:
+        linger.ms: 0
+        delivery.timeout.ms: 20000 # 20 Sekunden
+        request.timeout.ms: 10000 # 10 Sekunden
+        spring.json.type.mapping: >
+          message:de.juplo.kafka.ClientMessage,
+          foo:de.juplo.kafka.FooMessage,
+          greeting:de.juplo.kafka.Greeting
 logging:
   level:
     root: INFO
index cd7d928..76cfe42 100644 (file)
@@ -26,8 +26,7 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.
 
 @SpringBootTest(
                properties = {
-                               "spring.kafka.consumer.bootstrap-servers=${spring.embedded.kafka.brokers}",
-                               "producer.bootstrap-server=${spring.embedded.kafka.brokers}",
+                               "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
                                "producer.topic=" + TOPIC})
 @AutoConfigureMockMvc
 @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)