From 6c45d0cdf44b639ae1ea9e823ae3c3db7961f38f Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 1 Nov 2024 09:55:43 +0100 Subject: [PATCH] =?utf8?q?Backpressure=20f=C3=BCr=20den=20`ExampleProducer?= =?utf8?q?`=20implementiert?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- README.sh | 2 +- docker/docker-compose.yml | 5 +-- pom.xml | 2 +- .../juplo/kafka/ApplicationConfiguration.java | 1 + .../de/juplo/kafka/ApplicationProperties.java | 2 ++ .../java/de/juplo/kafka/ExampleProducer.java | 33 ++++++++++++++++--- src/main/resources/application.yml | 2 ++ 7 files changed, 38 insertions(+), 9 deletions(-) diff --git a/README.sh b/README.sh index 918e83a..19419b5 100755 --- a/README.sh +++ b/README.sh @@ -1,6 +1,6 @@ #!/bin/bash -IMAGE=juplo/spring-producer:1.0-SNAPSHOT +IMAGE=juplo/spring-producer:1.0-backpressure-SNAPSHOT if [ "$1" = "cleanup" ] then diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index d66bb2a..6b6ce22 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -190,13 +190,14 @@ services: - kafka-3 producer: - image: juplo/spring-producer:1.0-SNAPSHOT + image: juplo/spring-producer:1.0-backpressure-SNAPSHOT environment: juplo.bootstrap-server: kafka:9092 juplo.client-id: producer juplo.producer.topic: test juplo.producer.delivery-timeout: 2147483647ms - juplo.producer.buffer-memory: 32768 + juplo.producer.max-block: 2147483647ms + juplo.producer.max-queue-length: 10 consumer: image: juplo/simple-consumer:1.0-SNAPSHOT diff --git a/pom.xml b/pom.xml index 841299b..d2cb135 100644 --- a/pom.xml +++ b/pom.xml @@ -15,7 +15,7 @@ spring-producer Spring Producer A Simple Spring-Boot-Producer, that takes messages via POST and confirms successs - 1.0-SNAPSHOT + 1.0-backpressure-SNAPSHOT 21 diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 7540dd3..c34dab0 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -29,6 +29,7 @@ public class ApplicationConfiguration properties.getProducerProperties().getThrottle() == null ? Duration.ofMillis(500) : properties.getProducerProperties().getThrottle(), + properties.getProducerProperties().getMaxQueueLength(), kafkaProducer, () -> applicationContext.close()); } diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java index 4323262..f2fd951 100644 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java @@ -55,6 +55,8 @@ public class ApplicationProperties @NotNull private Duration linger; @NotNull + private Integer maxQueueLength; + @NotNull @NotEmpty private String compressionType; private Duration throttle; diff --git a/src/main/java/de/juplo/kafka/ExampleProducer.java b/src/main/java/de/juplo/kafka/ExampleProducer.java index bc5cf89..4300eac 100644 --- a/src/main/java/de/juplo/kafka/ExampleProducer.java +++ b/src/main/java/de/juplo/kafka/ExampleProducer.java @@ -5,6 +5,7 @@ import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import java.time.Duration; +import java.util.concurrent.atomic.AtomicInteger; @Slf4j @@ -13,11 +14,13 @@ public class ExampleProducer implements Runnable private final String id; private final String topic; private final Duration throttle; + private final int maxQueueLength; private final Producer producer; private final Thread workerThread; private final Runnable closeCallback; private volatile boolean running = true; + private final AtomicInteger queued = new AtomicInteger(0); private long produced = 0; @@ -25,12 +28,14 @@ public class ExampleProducer implements Runnable String id, String topic, Duration throttle, + int maxQueueLength, Producer producer, Runnable closeCallback) { this.id = id; this.topic = topic; this.throttle = throttle; + this.maxQueueLength = maxQueueLength; this.producer = producer; workerThread = new Thread(this, "ExampleProducer Worker-Thread"); @@ -80,6 +85,18 @@ public class ExampleProducer implements Runnable void send(String key, String value) { + while (queued.get() >= maxQueueLength) + { + try + { + Thread.sleep(1000); + } + catch (InterruptedException e) + { + log.warn("{} - Interrupted while waiting for queue to be progressed, queued={}!", queued, e); + } + } + final long time = System.currentTimeMillis(); final ProducerRecord record = new ProducerRecord<>( @@ -88,34 +105,39 @@ public class ExampleProducer implements Runnable value // Value ); + int queuedAfterSend = queued.incrementAndGet(); + producer.send(record, (metadata, e) -> { long now = System.currentTimeMillis(); + int queuedAfterReceive = queued.decrementAndGet(); if (e == null) { // HANDLE SUCCESS produced++; log.debug( - "{} - Sent message {}={}, partition={}:{}, timestamp={}, latency={}ms", + "{} - Sent message {}={}, partition={}:{}, timestamp={}, latency={}ms, queued={}", id, record.key(), record.value(), metadata.partition(), metadata.offset(), metadata.timestamp(), - now - time + now - time, + queuedAfterReceive ); } else { // HANDLE ERROR log.error( - "{} - ERROR for message {}={}, timestamp={}, latency={}ms: {}", + "{} - ERROR for message {}={}, timestamp={}, latency={}ms, queued={}: {}", id, record.key(), record.value(), metadata == null ? -1 : metadata.timestamp(), now - time, + queuedAfterReceive, e.toString() ); } @@ -123,11 +145,12 @@ public class ExampleProducer implements Runnable long now = System.currentTimeMillis(); log.trace( - "{} - Queued message {}={}, latency={}ms", + "{} - Queued message {}={}, latency={}ms, queued={}", id, record.key(), record.value(), - now - time + now - time, + queuedAfterSend ); } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 98ea128..3d632cd 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -9,6 +9,7 @@ juplo: buffer-memory: 33554432 batch-size: 16384 linger: 0 + max-queue-length: 100 compression-type: gzip throttle: 500 management: @@ -36,6 +37,7 @@ info: buffer-memory: ${juplo.producer.buffer-memory} batch-size: ${juplo.producer.batch-size} linger: ${juplo.producer.linger} + max-queue-length: ${juplo.producer.max-queue-length} compression-type: ${juplo.producer.compression-type} throttle: ${juplo.producer.throttle} logging: -- 2.20.1