#!/bin/bash
-IMAGE=juplo/spring-producer:1.0-backpressure-SNAPSHOT
+IMAGE=juplo/spring-producer:1.0-auditheaders-SNAPSHOT
if [ "$1" = "cleanup" ]
then
fi
docker compose -f docker/docker-compose.yml up -d --remove-orphans kafka-1 kafka-2 kafka-3
-docker compose -f docker/docker-compose.yml rm -svf setup producer
+docker compose -f docker/docker-compose.yml rm -svf setup peter ute
if [[
$(docker image ls -q $IMAGE) == "" ||
docker compose -f docker/docker-compose.yml up --remove-orphans setup || exit 1
-docker compose -f docker/docker-compose.yml up -d producer
-sleep 5
+docker compose -f docker/docker-compose.yml up -d peter ute
+sleep 10
-docker compose -f docker/docker-compose.yml pause kafka-1 kafka-3
-sleep 20
-docker compose -f docker/docker-compose.yml unpause kafka-1 kafka-3
-sleep 3
-
-docker compose -f docker/docker-compose.yml stop producer
echo
echo "Empfangen:"
-docker compose -f docker/docker-compose.yml exec cli kafkacat -b kafka:9092 -t test -e -q -J
+docker compose -f docker/docker-compose.yml exec cli kafkacat -b kafka:9092 -t test -e -c 100
-docker compose -f docker/docker-compose.yml logs producer | grep ERROR
+echo
+echo "Empfangen (mit Headern!):"
+docker compose -f docker/docker-compose.yml exec cli kafkacat -b kafka:9092 -t test -e -c 100 -J
- kafka-2
- kafka-3
- producer:
- image: juplo/spring-producer:1.0-backpressure-SNAPSHOT
+ peter:
+ image: juplo/spring-producer:1.0-auditheaders-SNAPSHOT
environment:
juplo.bootstrap-server: kafka:9092
- juplo.client-id: producer
+ juplo.client-id: peter
juplo.producer.topic: test
+ juplo.producer.linger: 300ms
+ juplo.producer.throttle: 66ms
juplo.producer.delivery-timeout: 2147483647ms
- juplo.producer.max-block: 2147483647ms
- juplo.producer.max-queue-length: 10
+ juplo.producer.max-queue-length: 100
+
+ ute:
+ image: juplo/spring-producer:1.0-auditheaders-SNAPSHOT
+ environment:
+ juplo.bootstrap-server: kafka:9092
+ juplo.client-id: ute
+ juplo.producer.topic: test
+ juplo.producer.lingers: 500ms
+ juplo.producer.throttle: 30ms
+ juplo.producer.delivery-timeout: 2147483647ms
+ juplo.producer.max-queue-length: 50
consumer:
image: juplo/simple-consumer:1.0-SNAPSHOT
<artifactId>spring-producer</artifactId>
<name>Spring Producer</name>
<description>A Simple Spring-Boot-Producer, that takes messages via POST and confirms successs</description>
- <version>1.0-backpressure-SNAPSHOT</version>
+ <version>1.0-auditheaders-SNAPSHOT</version>
<properties>
<java.version>21</java.version>
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
+import java.math.BigInteger;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
public class ExampleProducer implements Runnable
{
+ public final static String HEADER_ID = "id";
+ public final static String HEADER_QUEUED = "#";
+
private final String id;
private final String topic;
private final Duration throttle;
int queuedAfterSend = queued.incrementAndGet();
+ record
+ .headers()
+ .add(HEADER_ID, id.getBytes())
+ .add(HEADER_QUEUED, BigInteger.valueOf(queuedAfterSend).toByteArray());
+
producer.send(record, (metadata, e) ->
{
long now = System.currentTimeMillis();