#!/bin/bash
-IMAGE=juplo/simple-producer:1.0-SNAPSHOT
+IMAGE=juplo/fire-forget:1.0-SNAPSHOT
if [ "$1" = "cleanup" ]
then
docker-compose -f docker/docker-compose.yml logs setup
docker-compose -f docker/docker-compose.yml ps
-docker-compose -f docker/docker-compose.yml up -d producer
-sleep 5
+docker-compose -f docker/docker-compose.yml up producer
-docker-compose -f docker/docker-compose.yml exec cli kafkacat -b kafka:9092 -t test -c 20 -f'topic=%t\tpartition=%p\toffset=%o\tkey=%k\tvalue=%s\n'
-
-docker-compose -f docker/docker-compose.yml stop producer
-docker-compose -f docker/docker-compose.yml exec cli kafkacat -b kafka:9092 -t test -e -f'topic=%t\tpartition=%p\toffset=%o\tkey=%k\tvalue=%s\n'
-docker-compose -f docker/docker-compose.yml logs producer
+# tag::kafkacat[]
+kafkacat -b :9092 -e -q -t test | wc -l
+# end::kafkacat[]
</parent>
<groupId>de.juplo.kafka</groupId>
- <artifactId>simple-producer</artifactId>
+ <artifactId>fire-forget</artifactId>
<name>Super Simple Producer</name>
<version>1.0-SNAPSHOT</version>
private final Producer<String, String> producer;
private long produced = 0;
- private volatile boolean running = true;
- private volatile boolean done = false;
public SimpleProducer(String broker, String topic, String clientId)
{
public void run()
{
- long i = 0;
-
try
{
- for (; running ; i++)
+ for (int i = 0; i < 10000 ; i++)
{
send(Long.toString(i%10), Long.toString(i));
- Thread.sleep(500);
}
}
- catch (InterruptedException e) {}
finally
{
- log.info("{}: Closing the KafkaProducer", id);
- producer.close();
log.info("{}: Produced {} messages in total, exiting!", id, produced);
- done = true;
}
}
SimpleProducer instance = new SimpleProducer(broker, topic, clientId);
- Runtime.getRuntime().addShutdownHook(new Thread(() ->
- {
- instance.running = false;
- while (!instance.done)
- {
- log.info("Waiting for main-thread...");
- try
- {
- Thread.sleep(1000);
- }
- catch (InterruptedException e) {}
- }
- log.info("Shutdown completed.");
- }));
-
log.info(
"Running SimpleProducer: broker={}, topic={}, client-id={}",
broker,