From 557562b5e5c07e95f909e6e6760f27129aad0a15 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 20 Nov 2022 18:07:03 +0100 Subject: [PATCH] =?utf8?q?Den=20`supersimple-producer`=20f=C3=BCr=20die=20?= =?utf8?q?Acks-=C3=9Cbung=20=C3=BCberarbeitet?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Damit der Producer in der Acks-Übung wiederverwendet werden kann, muss er endlos Nachrichten produzieren. * Für die Acks-Übung muss das Producing außerdem gedrosselt laufen. * Vorführung in `README.sh` entsprechend angepasst. * Da der Producer sich nie beendet, landet der Test in einer Endlosschleife * Daher musste der Test hier erstmal deaktiviert werden :/ --- README.sh | 18 ++++++------- docker/docker-compose.yml | 25 +++++++++++++++++-- src/main/java/de/juplo/kafka/Application.java | 11 +++++++- .../java/de/juplo/kafka/ApplicationTests.java | 2 ++ 4 files changed, 44 insertions(+), 12 deletions(-) diff --git a/README.sh b/README.sh index 867070b..e1cf1d0 100755 --- a/README.sh +++ b/README.sh @@ -22,12 +22,12 @@ else docker image ls $IMAGE fi -docker-compose -f docker/docker-compose.yml up --remove-orphans setup || exit 1 - -docker-compose -f docker/docker-compose.yml up -t0 -d cli -docker-compose -f docker/docker-compose.yml ps -docker-compose -f docker/docker-compose.yml up producer - -# tag::kafkacat[] -kafkacat -b :9092 -t test -e -f'topic=%t\tpartition=%p\toffset=%o\tkey=%k\tvalue=%s\n' -# end::kafkacat[] +docker-compose -f docker/docker-compose.yml up setup +docker-compose -f docker/docker-compose.yml up -d acks-all acks-1 +sleep 5 +docker-compose -f docker/docker-compose.yml stop kafka-1 +sleep 5 +docker-compose -f docker/docker-compose.yml stop kafka-3 +sleep 5 +docker-compose -f docker/docker-compose.yml stop acks-all acks-1 +docker-compose -f docker/docker-compose.yml logs acks-all acks-1 diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index ca8befa..2339e29 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -211,12 +211,33 @@ services: - kafka-2 - kafka-3 - producer: + acks-all: image: juplo/supersimple-producer:1.0-SNAPSHOT environment: spring.kafka.bootstrap-servers: kafka:9092 - spring.kafka.client-id: producer + spring.kafka.client-id: acks-all spring.kafka.template.default-topic: test + spring.kafka.producer.acks: all + + acks-1: + image: juplo/supersimple-producer:1.0-SNAPSHOT + environment: + spring.kafka.bootstrap-servers: kafka:9092 + spring.kafka.client-id: acks-1 + spring.kafka.template.default-topic: test + spring.kafka.producer.acks: 1 + + acks-0: + image: juplo/supersimple-producer:1.0-SNAPSHOT + environment: + spring.kafka.bootstrap-servers: kafka:9092 + spring.kafka.client-id: acks-0 + spring.kafka.template.default-topic: test + spring.kafka.producer.acks: 0 + + consumer: + image: juplo/toolbox + command: kafkacat -C -b kafka:9092 -t test -o 0 -f'p=%p|o=%o|k=%k|v=%s\n' volumes: zookeeper-data: diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java index 5e123dd..fb3a523 100644 --- a/src/main/java/de/juplo/kafka/Application.java +++ b/src/main/java/de/juplo/kafka/Application.java @@ -22,7 +22,7 @@ public class Application implements ApplicationRunner @Override public void run(ApplicationArguments args) { - for (int i = 0; i < 100; i++) + for (int i = 0; true; i++) { // end::supersimple[] // tag::callback[] @@ -41,6 +41,15 @@ public class Application implements ApplicationRunner e -> log.error("ERROR sendig message", e)); // end::callback[] // tag::supersimple[] + + try + { + Thread.sleep(500); + } + catch (InterruptedException e) + { + return; + } } } diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 714175e..7796c82 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -1,5 +1,6 @@ package de.juplo.kafka; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.kafka.test.context.EmbeddedKafka; @@ -13,6 +14,7 @@ import static de.juplo.kafka.ApplicationTests.TOPIC; "spring.kafka.template.default-topic=" + TOPIC }) @EmbeddedKafka(topics = TOPIC) +@Disabled public class ApplicationTests { public final static String TOPIC = "out"; -- 2.20.1