Den `supersimple-producer` für die Acks-Übung überarbeitet
authorKai Moritz <kai@juplo.de>
Sun, 20 Nov 2022 17:07:03 +0000 (18:07 +0100)
committerKai Moritz <kai@juplo.de>
Tue, 17 Oct 2023 19:18:58 +0000 (21:18 +0200)
* Damit der Producer in der Acks-Übung wiederverwendet werden kann, muss
  er endlos Nachrichten produzieren.
* Für die Acks-Übung muss das Producing außerdem gedrosselt laufen.
* Vorführung in `README.sh` entsprechend angepasst.
* Da der Producer sich nie beendet, landet der Test in einer Endlosschleife
* Daher musste der Test hier erstmal deaktiviert werden :/

README.sh
docker/docker-compose.yml
src/main/java/de/juplo/kafka/Application.java
src/test/java/de/juplo/kafka/ApplicationTests.java

index 867070b..e1cf1d0 100755 (executable)
--- a/README.sh
+++ b/README.sh
@@ -22,12 +22,12 @@ else
   docker image ls $IMAGE
 fi
 
-docker-compose -f docker/docker-compose.yml up --remove-orphans setup || exit 1
-
-docker-compose -f docker/docker-compose.yml up -t0 -d cli
-docker-compose -f docker/docker-compose.yml ps
-docker-compose -f docker/docker-compose.yml up producer
-
-# tag::kafkacat[]
-kafkacat -b :9092 -t test -e -f'topic=%t\tpartition=%p\toffset=%o\tkey=%k\tvalue=%s\n'
-# end::kafkacat[]
+docker-compose -f docker/docker-compose.yml up setup
+docker-compose -f docker/docker-compose.yml up -d acks-all acks-1
+sleep 5
+docker-compose -f docker/docker-compose.yml stop kafka-1
+sleep 5
+docker-compose -f docker/docker-compose.yml stop kafka-3
+sleep 5
+docker-compose -f docker/docker-compose.yml stop acks-all acks-1
+docker-compose -f docker/docker-compose.yml logs acks-all acks-1
index ca8befa..2339e29 100644 (file)
@@ -211,12 +211,33 @@ services:
       - kafka-2
       - kafka-3
 
-  producer:
+  acks-all:
     image: juplo/supersimple-producer:1.0-SNAPSHOT
     environment:
       spring.kafka.bootstrap-servers: kafka:9092
-      spring.kafka.client-id: producer
+      spring.kafka.client-id: acks-all
       spring.kafka.template.default-topic: test
+      spring.kafka.producer.acks: all
+
+  acks-1:
+    image: juplo/supersimple-producer:1.0-SNAPSHOT
+    environment:
+      spring.kafka.bootstrap-servers: kafka:9092
+      spring.kafka.client-id: acks-1
+      spring.kafka.template.default-topic: test
+      spring.kafka.producer.acks: 1
+
+  acks-0:
+    image: juplo/supersimple-producer:1.0-SNAPSHOT
+    environment:
+      spring.kafka.bootstrap-servers: kafka:9092
+      spring.kafka.client-id: acks-0
+      spring.kafka.template.default-topic: test
+      spring.kafka.producer.acks: 0
+
+  consumer:
+    image: juplo/toolbox
+    command: kafkacat -C -b kafka:9092 -t test -o 0 -f'p=%p|o=%o|k=%k|v=%s\n'
 
 volumes:
   zookeeper-data:
index 5e123dd..fb3a523 100644 (file)
@@ -22,7 +22,7 @@ public class Application implements ApplicationRunner
   @Override
   public void run(ApplicationArguments args)
   {
-    for (int i = 0; i < 100; i++)
+    for (int i = 0; true; i++)
     {
       // end::supersimple[]
       // tag::callback[]
@@ -41,6 +41,15 @@ public class Application implements ApplicationRunner
           e -> log.error("ERROR sendig message", e));
       // end::callback[]
       // tag::supersimple[]
+
+      try
+      {
+        Thread.sleep(500);
+      }
+      catch (InterruptedException e)
+      {
+        return;
+      }
     }
   }
 
index 714175e..7796c82 100644 (file)
@@ -1,5 +1,6 @@
 package de.juplo.kafka;
 
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.springframework.boot.test.context.SpringBootTest;
 import org.springframework.kafka.test.context.EmbeddedKafka;
@@ -13,6 +14,7 @@ import static de.juplo.kafka.ApplicationTests.TOPIC;
         "spring.kafka.template.default-topic=" + TOPIC
     })
 @EmbeddedKafka(topics = TOPIC)
+@Disabled
 public class ApplicationTests
 {
   public final static String TOPIC = "out";