WIP
authorKai Moritz <kai@juplo.de>
Mon, 5 Sep 2022 16:12:09 +0000 (18:12 +0200)
committerKai Moritz <kai@juplo.de>
Mon, 5 Sep 2022 16:12:09 +0000 (18:12 +0200)
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/ApplicationProperties.java
src/main/java/de/juplo/kafka/ProduceFailure.java
src/main/java/de/juplo/kafka/RestGateway.java
src/main/resources/application.yml

index 1d64221..3a17625 100644 (file)
@@ -7,6 +7,7 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.kafka.annotation.EnableKafka;
+import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.kafka.core.ProducerFactory;
 
 import java.util.Properties;
@@ -21,14 +22,13 @@ public class ApplicationConfiguration
   public RestGateway restGateway(
       ApplicationProperties applicationProperties,
       KafkaProperties kafkaProperties,
-      Producer<String, Integer> kafkaProducer)
+      KafkaTemplate<String, Integer> kafkaTemplate)
   {
     return
         new RestGateway(
             kafkaProperties.getClientId(),
-            applicationProperties.getTopic(),
             applicationProperties.getPartition(),
-            kafkaProducer);
+            kafkaTemplate);
   }
 
   @Bean(destroyMethod = "close")
index 7d5f105..d0e694d 100644 (file)
@@ -4,17 +4,11 @@ import lombok.Getter;
 import lombok.Setter;
 import org.springframework.boot.context.properties.ConfigurationProperties;
 
-import javax.validation.constraints.NotEmpty;
-import javax.validation.constraints.NotNull;
-
 
 @ConfigurationProperties(prefix = "sumup.gateway")
 @Getter
 @Setter
 public class ApplicationProperties
 {
-  @NotNull
-  @NotEmpty
-  private String topic;
   private Integer partition;
 }
index 873a67b..7c78482 100644 (file)
@@ -12,7 +12,7 @@ public class ProduceFailure implements ProduceResult
   private final Integer status;
 
 
-  public ProduceFailure(Exception e)
+  public ProduceFailure(Throwable e)
   {
     status = 500;
     exception = e.getClass().getSimpleName();
index 53a87df..2f2b18c 100644 (file)
@@ -2,9 +2,12 @@ package de.juplo.kafka;
 
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
 import org.springframework.http.HttpStatus;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.SendResult;
+import org.springframework.util.concurrent.ListenableFuture;
 import org.springframework.web.bind.annotation.*;
 import org.springframework.web.context.request.async.DeferredResult;
 
@@ -16,9 +19,8 @@ import org.springframework.web.context.request.async.DeferredResult;
 public class RestGateway
 {
   private final String id;
-  private final String topic;
   private final Integer partition;
-  private final Producer<String, Integer> producer;
+  private final KafkaTemplate<String, Integer> kafkaTemplate;
 
   private long produced = 0;
 
@@ -32,52 +34,47 @@ public class RestGateway
 
     final long time = System.currentTimeMillis();
 
-    final ProducerRecord<String, Integer> record = new ProducerRecord<>(
-        topic,  // Topic
-        partition, // Partition - Uses default-algorithm, if null
-        key,    // Key
-        value   // Value
-    );
-
-    producer.send(record, (metadata, e) ->
-    {
-      long now = System.currentTimeMillis();
-      if (e == null)
-      {
-        // HANDLE SUCCESS
-        produced++;
-        result.setResult(new ProduceSuccess(metadata.partition(), metadata.offset()));
-        log.debug(
-            "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms",
-            id,
-            record.key(),
-            record.value(),
-            metadata.partition(),
-            metadata.offset(),
-            metadata.timestamp(),
-            now - time
-        );
-      }
-      else
-      {
-        // HANDLE ERROR
-        result.setErrorResult(new ProduceFailure(e));
-        log.error(
-            "{} - ERROR key={} timestamp={} latency={}ms: {}",
-            id,
-            record.key(),
-            metadata == null ? -1 : metadata.timestamp(),
-            now - time,
-            e.toString()
-        );
-      }
-    });
+    ListenableFuture<SendResult<String, Integer>> future =
+        kafkaTemplate.send(null, partition, key, value);
 
     long now = System.currentTimeMillis();
+
+    future.addCallback(
+        sendResult ->
+        {
+          // HANDLE SUCCESS
+          produced++;
+          RecordMetadata metadata = sendResult.getRecordMetadata();
+          ProducerRecord<String, Integer> record = sendResult.getProducerRecord();
+          result.setResult(new ProduceSuccess(metadata.partition(), metadata.offset()));
+          log.debug(
+              "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms",
+              id,
+              record.key(),
+              record.value(),
+              metadata.partition(),
+              metadata.offset(),
+              metadata.timestamp(),
+              now - time
+          );
+        },
+        e->
+        {
+          // HANDLE ERROR
+          result.setErrorResult(new ProduceFailure(e));
+          log.error(
+              "{} - ERROR key={} latency={}ms: {}",
+              id,
+              key,
+              now - time,
+              e.toString()
+          );
+        });
+
     log.trace(
         "{} - Queued message with key={} latency={}ms",
         id,
-        record.key(),
+        key,
         now - time
     );
 
index deeb60a..40d0a85 100644 (file)
@@ -1,6 +1,3 @@
-sumup:
-  gateway:
-    topic: test
 management:
   endpoint:
     shutdown:
@@ -21,7 +18,7 @@ info:
     group-id: ${spring.kafka.consumer.group-id}
     auto-offset-reset: ${spring.kafka.consumer.auto-offset-reset}
     auto-commit-interval-ms: ${spring.kafka.consumer.properties.auto.commit.interval.ms}
-    topic: ${sumup.gateway.topic}
+    topic: ${spring.kafka.template.default-topic}
     acks: ${spring.kafka.producer.acks}
     batch-size: ${spring.kafka.producer.batch-size}
     linger-ms: ${spring.kafka.producer.properties.linger.ms}
@@ -45,6 +42,8 @@ spring:
         linger.ms: 0
         delivery.timeout.ms: 20000 # 20 Sekunden
         request.timeout.ms: 10000 # 10 Sekunden
+    template:
+      default-topic: test
 logging:
   level:
     root: INFO