From 96013a5bc40a65eb35713bccc756eea03c4f3de7 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 12 Jun 2022 10:43:50 +0200 Subject: [PATCH] =?utf8?q?F=C3=BCr=20das=20Versenden=20wird=20das=20`Kafka?= =?utf8?q?Template`=20von=20Spring=20verwendet?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Dabei wird weiterhin ein `ProducerRecord` erzeugt und dem `KafkaTemplate` übergeben, um die Änderungen möglichst übersichtlich zu halten. * Die zuvor über `ApplicationProperties` einstellbaren Parameter wurden in der `application.yml` entsprechend in dem `spring.kafka.*`-Namespace gesetzt. * Eintragung in den per Actuator veröffentlichten Infos angepasst. --- docker-compose.yml | 2 +- .../de/juplo/kafka/ApplicationProperties.java | 5 -- .../java/de/juplo/kafka/ProduceFailure.java | 2 +- .../java/de/juplo/kafka/RestProducer.java | 54 +++++++------------ src/main/resources/application.yml | 32 +++++++---- .../java/de/juplo/kafka/ApplicationTests.java | 3 +- 6 files changed, 43 insertions(+), 55 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index a03d211..287168c 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -41,7 +41,7 @@ services: ports: - 8080:8880 environment: - producer.bootstrap-server: kafka:9092 + spring.kafka.bootstrap-servers: kafka:9092 producer.client-id: producer producer.topic: test diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java index 1f30262..7b01334 100644 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java @@ -9,11 +9,6 @@ import org.springframework.boot.context.properties.ConfigurationProperties; @Setter public class ApplicationProperties { - private String bootstrapServer; private String clientId; private String topic; - private String acks; - private Integer batchSize; - private Integer lingerMs; - private String compressionType; } diff --git a/src/main/java/de/juplo/kafka/ProduceFailure.java b/src/main/java/de/juplo/kafka/ProduceFailure.java index 873a67b..7c78482 100644 --- a/src/main/java/de/juplo/kafka/ProduceFailure.java +++ b/src/main/java/de/juplo/kafka/ProduceFailure.java @@ -12,7 +12,7 @@ public class ProduceFailure implements ProduceResult private final Integer status; - public ProduceFailure(Exception e) + public ProduceFailure(Throwable e) { status = 500; exception = e.getClass().getSimpleName(); diff --git a/src/main/java/de/juplo/kafka/RestProducer.java b/src/main/java/de/juplo/kafka/RestProducer.java index 70b327a..56f3382 100644 --- a/src/main/java/de/juplo/kafka/RestProducer.java +++ b/src/main/java/de/juplo/kafka/RestProducer.java @@ -1,16 +1,14 @@ package de.juplo.kafka; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.clients.producer.RecordMetadata; import org.springframework.http.HttpStatus; -import org.springframework.kafka.support.serializer.JsonSerializer; +import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.*; import org.springframework.web.context.request.async.DeferredResult; import javax.annotation.PreDestroy; -import java.util.Properties; import java.util.concurrent.ExecutionException; @@ -20,32 +18,17 @@ public class RestProducer { private final String id; private final String topic; - private final KafkaProducer producer; + private final KafkaTemplate kafkaTemplate; private long produced = 0; - public RestProducer(ApplicationProperties properties) + public RestProducer( + ApplicationProperties properties, + KafkaTemplate kafkaTemplate) { this.id = properties.getClientId(); this.topic = properties.getTopic(); - - Properties props = new Properties(); - props.put("bootstrap.servers", properties.getBootstrapServer()); - props.put("client.id", properties.getClientId()); - props.put("acks", properties.getAcks()); - props.put("batch.size", properties.getBatchSize()); - props.put("delivery.timeout.ms", 20000); // 20 Sekunden - props.put("request.timeout.ms", 10000); // 10 Sekunden - props.put("linger.ms", properties.getLingerMs()); - props.put("compression.type", properties.getCompressionType()); - props.put("key.serializer", StringSerializer.class.getName()); - props.put("value.serializer", JsonSerializer.class.getName()); - props.put(JsonSerializer.TYPE_MAPPINGS, - "message:" + ClientMessage.class.getName() + "," + - "foo:" + FooMessage.class.getName() + "," + - "greeting:" + Greeting.class.getName()); - - this.producer = new KafkaProducer<>(props); + this.kafkaTemplate = kafkaTemplate; } @PostMapping(path = "{key}") @@ -96,12 +79,13 @@ public class RestProducer final long time = System.currentTimeMillis(); - producer.send(record, (metadata, e) -> - { - long now = System.currentTimeMillis(); - if (e == null) + kafkaTemplate.send(record).addCallback( + (sendResult) -> { + long now = System.currentTimeMillis(); + // HANDLE SUCCESS + RecordMetadata metadata = sendResult.getRecordMetadata(); produced++; result.setResult(new ProduceSuccess(metadata.partition(), metadata.offset())); log.debug( @@ -114,21 +98,21 @@ public class RestProducer metadata.timestamp(), now - time ); - } - else + }, + (e) -> { + long now = System.currentTimeMillis(); + // HANDLE ERROR result.setErrorResult(new ProduceFailure(e)); log.error( - "{} - ERROR key={} timestamp={} latency={}ms: {}", + "{} - ERROR key={} timestamp=-1 latency={}ms: {}", id, record.key(), - metadata == null ? -1 : metadata.timestamp(), now - time, e.toString() ); - } - }); + }); long now = System.currentTimeMillis(); log.trace( @@ -152,8 +136,6 @@ public class RestProducer public void destroy() throws ExecutionException, InterruptedException { log.info("{} - Destroy!", id); - log.info("{} - Closing the KafkaProducer", id); - producer.close(); log.info("{}: Produced {} messages in total, exiting!", id, produced); } } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 726204e..f54576a 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -1,11 +1,6 @@ producer: - bootstrap-server: :9092 client-id: DEV topic: test - acks: -1 - batch-size: 16384 - linger-ms: 0 - compression-type: gzip management: endpoint: shutdown: @@ -21,13 +16,30 @@ management: enabled: true info: kafka: - bootstrap-server: ${producer.bootstrap-server} + bootstrap-servers: ${spring.kafka.bootstrap-servers} client-id: ${producer.client-id} topic: ${producer.topic} - acks: ${producer.acks} - batch-size: ${producer.batch-size} - linger-ms: ${producer.linger-ms} - compression-type: ${producer.compression-type} + acks: ${spring.kafka.producer.acks} + batch-size: ${spring.kafka.producer.batch-size} + linger-ms: ${spring.kafka.producer.properties.linger.ms} + compression-type: ${spring.kafka.producer.compression-type} +spring: + kafka: + bootstrap-servers: :9092 + producer: + acks: -1 + batch-size: 16384 + compression-type: gzip + key-serializer: org.apache.kafka.common.serialization.StringSerializer + value-serializer: org.springframework.kafka.support.serializer.JsonSerializer + properties: + linger.ms: 0 + delivery.timeout.ms: 20000 # 20 Sekunden + request.timeout.ms: 10000 # 10 Sekunden + spring.json.type.mapping: > + message:de.juplo.kafka.ClientMessage, + foo:de.juplo.kafka.FooMessage, + greeting:de.juplo.kafka.Greeting logging: level: root: INFO diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index cd7d928..76cfe42 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -26,8 +26,7 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers. @SpringBootTest( properties = { - "spring.kafka.consumer.bootstrap-servers=${spring.embedded.kafka.brokers}", - "producer.bootstrap-server=${spring.embedded.kafka.brokers}", + "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", "producer.topic=" + TOPIC}) @AutoConfigureMockMvc @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS) -- 2.20.1