- kafka-3
producer:
- image: juplo/spring-producer:1.0-SNAPSHOT
+ image: juplo/spring-producer:1.0-backpressure-SNAPSHOT
environment:
juplo.bootstrap-server: kafka:9092
juplo.client-id: producer
juplo.producer.topic: test
juplo.producer.delivery-timeout: 2147483647ms
- juplo.producer.buffer-memory: 32768
+ juplo.producer.max-block: 2147483647ms
+ juplo.producer.max-queue-length: 10
consumer:
image: juplo/simple-consumer:1.0-SNAPSHOT
import org.apache.kafka.clients.producer.ProducerRecord;
import java.time.Duration;
+import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
private final String id;
private final String topic;
private final Duration throttle;
+ private final int maxQueueLength;
private final Producer<String, String> producer;
private final Thread workerThread;
private final Runnable closeCallback;
private volatile boolean running = true;
+ private final AtomicInteger queued = new AtomicInteger(0);
private long produced = 0;
String id,
String topic,
Duration throttle,
+ int maxQueueLength,
Producer<String, String> producer,
Runnable closeCallback)
{
this.id = id;
this.topic = topic;
this.throttle = throttle;
+ this.maxQueueLength = maxQueueLength;
this.producer = producer;
workerThread = new Thread(this, "ExampleProducer Worker-Thread");
void send(String key, String value)
{
+ while (queued.get() >= maxQueueLength)
+ {
+ try
+ {
+ Thread.sleep(1000);
+ }
+ catch (InterruptedException e)
+ {
+ log.warn("{} - Interrupted while waiting for queue to be progressed, queued={}!", queued, e);
+ }
+ }
+
final long time = System.currentTimeMillis();
final ProducerRecord<String, String> record = new ProducerRecord<>(
value // Value
);
+ int queuedAfterSend = queued.incrementAndGet();
+
producer.send(record, (metadata, e) ->
{
long now = System.currentTimeMillis();
+ int queuedAfterReceive = queued.decrementAndGet();
if (e == null)
{
// HANDLE SUCCESS
produced++;
log.debug(
- "{} - Sent message {}={}, partition={}:{}, timestamp={}, latency={}ms",
+ "{} - Sent message {}={}, partition={}:{}, timestamp={}, latency={}ms, queued={}",
id,
record.key(),
record.value(),
metadata.partition(),
metadata.offset(),
metadata.timestamp(),
- now - time
+ now - time,
+ queuedAfterReceive
);
}
else
{
// HANDLE ERROR
log.error(
- "{} - ERROR for message {}={}, timestamp={}, latency={}ms: {}",
+ "{} - ERROR for message {}={}, timestamp={}, latency={}ms, queued={}: {}",
id,
record.key(),
record.value(),
metadata == null ? -1 : metadata.timestamp(),
now - time,
+ queuedAfterReceive,
e.toString()
);
}
long now = System.currentTimeMillis();
log.trace(
- "{} - Queued message {}={}, latency={}ms",
+ "{} - Queued message {}={}, latency={}ms, queued={}",
id,
record.key(),
record.value(),
- now - time
+ now - time,
+ queuedAfterSend
);
}