docker image ls $IMAGE
fi
-docker-compose -f docker/docker-compose.yml up --remove-orphans setup || exit 1
-
-docker-compose -f docker/docker-compose.yml up -t0 -d cli
-docker-compose -f docker/docker-compose.yml ps
-docker-compose -f docker/docker-compose.yml up producer
-
-# tag::kafkacat[]
-kafkacat -b :9092 -t test -e -f'topic=%t\tpartition=%p\toffset=%o\tkey=%k\tvalue=%s\n'
-# end::kafkacat[]
+docker-compose -f docker/docker-compose.yml up setup
+docker-compose -f docker/docker-compose.yml up -d acks-all acks-1
+sleep 5
+docker-compose -f docker/docker-compose.yml stop kafka-1
+sleep 5
+docker-compose -f docker/docker-compose.yml stop kafka-3
+sleep 5
+docker-compose -f docker/docker-compose.yml stop acks-all acks-1
+docker-compose -f docker/docker-compose.yml logs acks-all acks-1
- kafka-2
- kafka-3
- producer:
+ acks-all:
image: juplo/supersimple-producer:1.0-SNAPSHOT
environment:
spring.kafka.bootstrap-servers: kafka:9092
- spring.kafka.client-id: producer
+ spring.kafka.client-id: acks-all
spring.kafka.template.default-topic: test
+ spring.kafka.producer.acks: all
+
+ acks-1:
+ image: juplo/supersimple-producer:1.0-SNAPSHOT
+ environment:
+ spring.kafka.bootstrap-servers: kafka:9092
+ spring.kafka.client-id: acks-1
+ spring.kafka.template.default-topic: test
+ spring.kafka.producer.acks: 1
+
+ acks-0:
+ image: juplo/supersimple-producer:1.0-SNAPSHOT
+ environment:
+ spring.kafka.bootstrap-servers: kafka:9092
+ spring.kafka.client-id: acks-0
+ spring.kafka.template.default-topic: test
+ spring.kafka.producer.acks: 0
+
+ consumer:
+ image: juplo/toolbox
+ command: kafkacat -C -b kafka:9092 -t test -o 0 -f'p=%p|o=%o|k=%k|v=%s\n'
volumes:
zookeeper-data:
@Override
public void run(ApplicationArguments args)
{
- for (int i = 0; i < 100; i++)
+ for (int i = 0; true; i++)
{
// end::supersimple[]
// tag::callback[]
e -> log.error("ERROR sendig message", e));
// end::callback[]
// tag::supersimple[]
+
+ try
+ {
+ Thread.sleep(500);
+ }
+ catch (InterruptedException e)
+ {
+ return;
+ }
}
}
package de.juplo.kafka;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.test.context.EmbeddedKafka;
"spring.kafka.template.default-topic=" + TOPIC
})
@EmbeddedKafka(topics = TOPIC)
+@Disabled
public class ApplicationTests
{
public final static String TOPIC = "out";