From d0de5c9030e575251d62d2a207ede84cd881ccec Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Mon, 5 Sep 2022 18:12:09 +0200 Subject: [PATCH] WIP --- .../juplo/kafka/ApplicationConfiguration.java | 6 +- .../de/juplo/kafka/ApplicationProperties.java | 6 -- .../java/de/juplo/kafka/ProduceFailure.java | 2 +- src/main/java/de/juplo/kafka/RestGateway.java | 85 +++++++++---------- src/main/resources/application.yml | 7 +- 5 files changed, 48 insertions(+), 58 deletions(-) diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 1d64221..3a17625 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -7,6 +7,7 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import java.util.Properties; @@ -21,14 +22,13 @@ public class ApplicationConfiguration public RestGateway restGateway( ApplicationProperties applicationProperties, KafkaProperties kafkaProperties, - Producer kafkaProducer) + KafkaTemplate kafkaTemplate) { return new RestGateway( kafkaProperties.getClientId(), - applicationProperties.getTopic(), applicationProperties.getPartition(), - kafkaProducer); + kafkaTemplate); } @Bean(destroyMethod = "close") diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java index 7d5f105..d0e694d 100644 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java @@ -4,17 +4,11 @@ import lombok.Getter; import lombok.Setter; import org.springframework.boot.context.properties.ConfigurationProperties; -import javax.validation.constraints.NotEmpty; -import javax.validation.constraints.NotNull; - @ConfigurationProperties(prefix = "sumup.gateway") @Getter @Setter public class ApplicationProperties { - @NotNull - @NotEmpty - private String topic; private Integer partition; } diff --git a/src/main/java/de/juplo/kafka/ProduceFailure.java b/src/main/java/de/juplo/kafka/ProduceFailure.java index 873a67b..7c78482 100644 --- a/src/main/java/de/juplo/kafka/ProduceFailure.java +++ b/src/main/java/de/juplo/kafka/ProduceFailure.java @@ -12,7 +12,7 @@ public class ProduceFailure implements ProduceResult private final Integer status; - public ProduceFailure(Exception e) + public ProduceFailure(Throwable e) { status = 500; exception = e.getClass().getSimpleName(); diff --git a/src/main/java/de/juplo/kafka/RestGateway.java b/src/main/java/de/juplo/kafka/RestGateway.java index 53a87df..2f2b18c 100644 --- a/src/main/java/de/juplo/kafka/RestGateway.java +++ b/src/main/java/de/juplo/kafka/RestGateway.java @@ -2,9 +2,12 @@ package de.juplo.kafka; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; import org.springframework.http.HttpStatus; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.SendResult; +import org.springframework.util.concurrent.ListenableFuture; import org.springframework.web.bind.annotation.*; import org.springframework.web.context.request.async.DeferredResult; @@ -16,9 +19,8 @@ import org.springframework.web.context.request.async.DeferredResult; public class RestGateway { private final String id; - private final String topic; private final Integer partition; - private final Producer producer; + private final KafkaTemplate kafkaTemplate; private long produced = 0; @@ -32,52 +34,47 @@ public class RestGateway final long time = System.currentTimeMillis(); - final ProducerRecord record = new ProducerRecord<>( - topic, // Topic - partition, // Partition - Uses default-algorithm, if null - key, // Key - value // Value - ); - - producer.send(record, (metadata, e) -> - { - long now = System.currentTimeMillis(); - if (e == null) - { - // HANDLE SUCCESS - produced++; - result.setResult(new ProduceSuccess(metadata.partition(), metadata.offset())); - log.debug( - "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms", - id, - record.key(), - record.value(), - metadata.partition(), - metadata.offset(), - metadata.timestamp(), - now - time - ); - } - else - { - // HANDLE ERROR - result.setErrorResult(new ProduceFailure(e)); - log.error( - "{} - ERROR key={} timestamp={} latency={}ms: {}", - id, - record.key(), - metadata == null ? -1 : metadata.timestamp(), - now - time, - e.toString() - ); - } - }); + ListenableFuture> future = + kafkaTemplate.send(null, partition, key, value); long now = System.currentTimeMillis(); + + future.addCallback( + sendResult -> + { + // HANDLE SUCCESS + produced++; + RecordMetadata metadata = sendResult.getRecordMetadata(); + ProducerRecord record = sendResult.getProducerRecord(); + result.setResult(new ProduceSuccess(metadata.partition(), metadata.offset())); + log.debug( + "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms", + id, + record.key(), + record.value(), + metadata.partition(), + metadata.offset(), + metadata.timestamp(), + now - time + ); + }, + e-> + { + // HANDLE ERROR + result.setErrorResult(new ProduceFailure(e)); + log.error( + "{} - ERROR key={} latency={}ms: {}", + id, + key, + now - time, + e.toString() + ); + }); + log.trace( "{} - Queued message with key={} latency={}ms", id, - record.key(), + key, now - time ); diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index deeb60a..40d0a85 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -1,6 +1,3 @@ -sumup: - gateway: - topic: test management: endpoint: shutdown: @@ -21,7 +18,7 @@ info: group-id: ${spring.kafka.consumer.group-id} auto-offset-reset: ${spring.kafka.consumer.auto-offset-reset} auto-commit-interval-ms: ${spring.kafka.consumer.properties.auto.commit.interval.ms} - topic: ${sumup.gateway.topic} + topic: ${spring.kafka.template.default-topic} acks: ${spring.kafka.producer.acks} batch-size: ${spring.kafka.producer.batch-size} linger-ms: ${spring.kafka.producer.properties.linger.ms} @@ -45,6 +42,8 @@ spring: linger.ms: 0 delivery.timeout.ms: 20000 # 20 Sekunden request.timeout.ms: 10000 # 10 Sekunden + template: + default-topic: test logging: level: root: INFO -- 2.20.1