#!/bin/bash
-IMAGE=juplo/simple-producer:1.0-SNAPSHOT
+IMAGE=juplo/fire-forget:1.0-SNAPSHOT
if [ "$1" = "cleanup" ]
then
docker compose -f docker/docker-compose.yml logs setup
docker compose -f docker/docker-compose.yml ps
-docker compose -f docker/docker-compose.yml up -d producer
-sleep 5
+docker compose -f docker/docker-compose.yml up producer
-docker compose -f docker/docker-compose.yml exec cli kafkacat -b kafka:9092 -t test -c 20 -f'topic=%t\tpartition=%p\toffset=%o\tkey=%k\tvalue=%s\n'
-
-docker compose -f docker/docker-compose.yml stop producer
-docker compose -f docker/docker-compose.yml exec cli kafkacat -b kafka:9092 -t test -e -f'topic=%t\tpartition=%p\toffset=%o\tkey=%k\tvalue=%s\n'
-docker compose -f docker/docker-compose.yml logs producer
+# tag::kafkacat[]
+kafkacat -b :9092 -e -q -t test | wc -l
+# end::kafkacat[]
- kafka-3
producer:
- image: juplo/simple-producer:1.0-SNAPSHOT
+ image: juplo/fire-forget:1.0-SNAPSHOT
command: kafka:9092 test producer
volumes:
</parent>
<groupId>de.juplo.kafka</groupId>
- <artifactId>simple-producer</artifactId>
+ <artifactId>fire-forget</artifactId>
<name>Super Simple Producer</name>
<description>A Simple Producer, programmed with pure Java, that sends messages via Kafka</description>
<version>1.0-SNAPSHOT</version>
private final String topic;
private final Producer<String, String> producer;
- private volatile boolean running = true;
- private volatile boolean done = false;
private long produced = 0;
public ExampleProducer(String broker, String topic, String clientId)
public void run()
{
- long i = 0;
-
try
{
- for (; running; i++)
+ for (int i = 0; i < 10000 ; i++)
{
send(Long.toString(i%10), Long.toString(i));
- Thread.sleep(500);
}
}
catch (Exception e)
}
finally
{
- log.info("{}: Closing the KafkaProducer", id);
- producer.close();
log.info("{}: Produced {} messages in total, exiting!", id, produced);
- done = true;
}
}
ExampleProducer instance = new ExampleProducer(broker, topic, clientId);
- Runtime.getRuntime().addShutdownHook(new Thread(() ->
- {
- instance.running = false;
- while (!instance.done)
- {
- log.info("Waiting for main-thread...");
- try
- {
- Thread.sleep(1000);
- }
- catch (InterruptedException e) {}
- }
- log.info("Shutdown completed.");
- }));
-
log.info(
"Running ExampleProducer: broker={}, topic={}, client-id={}",
broker,