#!/bin/bash
-IMAGE=juplo/supersimple-producer:1.0-SNAPSHOT
+IMAGE=juplo/supersimple-producer-endless:1.0-SNAPSHOT
if [ "$1" = "cleanup" ]
then
- kafka-3
acks-all:
- image: juplo/supersimple-producer:1.0-SNAPSHOT
+ image: juplo/supersimple-producer-endless:1.0-SNAPSHOT
environment:
spring.kafka.bootstrap-servers: kafka:9092
spring.kafka.client-id: acks-all
spring.kafka.producer.acks: all
+ command: --key=klaus
acks-1:
- image: juplo/supersimple-producer:1.0-SNAPSHOT
+ image: juplo/supersimple-producer-endless:1.0-SNAPSHOT
environment:
spring.kafka.bootstrap-servers: kafka:9092
spring.kafka.client-id: acks-1
spring.kafka.producer.acks: 1
+ command: --key=peter
acks-0:
- image: juplo/supersimple-producer:1.0-SNAPSHOT
+ image: juplo/supersimple-producer-endless:1.0-SNAPSHOT
environment:
spring.kafka.bootstrap-servers: kafka:9092
spring.kafka.client-id: acks-0
spring.kafka.producer.acks: 0
-
- consumer:
- image: juplo/toolbox
- command: kafkacat -C -b kafka:9092 -t test -o 0 -f'p=%p|o=%o|k=%k|v=%s\n'
+ command: --key=ute
</parent>
<groupId>de.juplo.kafka</groupId>
- <artifactId>supersimple-producer</artifactId>
+ <artifactId>supersimple-producer-endless</artifactId>
<name>Super Simple Producer</name>
<description>Most minimal Kafka Producer ever!</description>
<version>1.0-SNAPSHOT</version>
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
+import java.util.List;
+
@Slf4j
@SpringBootApplication
@Override
public void run(ApplicationArguments args)
{
+ List<String> keys = args.getOptionValues("key");
+ if (keys == null || keys.size() != 1)
+ {
+ log.error("Exactly one key has to be given with --key");
+ return;
+ }
+ String key = keys.get(0);
for (int i = 0; true; i++)
{
ListenableFuture<SendResult<String, String>> listenableFuture =
- kafkaTemplate.send("test", Long.toString(i%10), Long.toString(i));
+ kafkaTemplate.send("test", key, Long.toString(i));
listenableFuture.addCallback(
result -> log.info(