- kafka-3
 
   producer:
-    image: juplo/spring-producer:1.0-SNAPSHOT
+    image: juplo/spring-producer:1.0-backpressure-SNAPSHOT
     environment:
       juplo.bootstrap-server: kafka:9092
       juplo.client-id: producer
       juplo.producer.topic: test
       juplo.producer.delivery-timeout: 2147483647ms
-      juplo.producer.buffer-memory: 32768
+      juplo.producer.max-block: 2147483647ms
+      juplo.producer.max-queue-length: 10
 
   consumer:
     image: juplo/simple-consumer:1.0-SNAPSHOT
 
 import org.apache.kafka.clients.producer.ProducerRecord;
 
 import java.time.Duration;
+import java.util.concurrent.atomic.AtomicInteger;
 
 
 @Slf4j
   private final String id;
   private final String topic;
   private final Duration throttle;
+  private final int maxQueueLength;
   private final Producer<String, String> producer;
   private final Thread workerThread;
   private final Runnable closeCallback;
 
   private volatile boolean running = true;
+  private final AtomicInteger queued = new AtomicInteger(0);
   private long produced = 0;
 
 
     String id,
     String topic,
     Duration throttle,
+    int maxQueueLength,
     Producer<String, String> producer,
     Runnable closeCallback)
   {
     this.id = id;
     this.topic = topic;
     this.throttle = throttle;
+    this.maxQueueLength = maxQueueLength;
     this.producer = producer;
 
     workerThread = new Thread(this, "ExampleProducer Worker-Thread");
 
   void send(String key, String value)
   {
+    while (queued.get() >= maxQueueLength)
+    {
+      try
+      {
+        Thread.sleep(1000);
+      }
+      catch (InterruptedException e)
+      {
+        log.warn("{} - Interrupted while waiting for queue to be progressed, queued={}!", queued, e);
+      }
+    }
+
     final long time = System.currentTimeMillis();
 
     final ProducerRecord<String, String> record = new ProducerRecord<>(
       value   // Value
     );
 
+    int queuedAfterSend = queued.incrementAndGet();
+
     producer.send(record, (metadata, e) ->
     {
       long now = System.currentTimeMillis();
+      int queuedAfterReceive = queued.decrementAndGet();
       if (e == null)
       {
         // HANDLE SUCCESS
         log.debug(
-          "{} - Sent message {}={}, partition={}, offset={}, timestamp={}, latency={}ms",
+          "{} - Sent message {}={}, partition={}, offset={}, timestamp={}, latency={}ms, queued={}",
           id,
           key,
           value,
           metadata.partition(),
           metadata.offset(),
           metadata.timestamp(),
-          now - time
+          now - time,
+          queuedAfterReceive
         );
       }
       else
       {
         // HANDLE ERROR
         log.error(
-          "{} - ERROR for message {}={}, latency={}ms: {}",
+          "{} - ERROR for message {}={}, latency={}ms, queued={}: {}",
           id,
           key,
           value,
           now - time,
+          queuedAfterReceive,
           e.toString()
         );
       }
     long now = System.currentTimeMillis();
     produced++;
     log.trace(
-      "{} - Queued message {}={}, latency={}ms",
+      "{} - Queued message {}={}, latency={}ms, queued={}",
       id,
       key,
       value,
-      now - time
+      now - time,
+      queuedAfterSend
     );
   }