Demonstration der RecordDeserializationException
authorKai Moritz <kai@juplo.de>
Sat, 9 Apr 2022 16:30:22 +0000 (18:30 +0200)
committerKai Moritz <kai@juplo.de>
Mon, 11 Apr 2022 13:23:28 +0000 (15:23 +0200)
* EndlessConsumer wird mit dem Value-Type `Long` anstatt `String` erzeugt
* Setup für die Demonstration der DeserializationException überarbeitet

README.sh
docker-compose.yml
src/main/java/de/juplo/kafka/ApplicationConfiguration.java

index 13176d2..72f0c60 100755 (executable)
--- a/README.sh
+++ b/README.sh
@@ -24,65 +24,23 @@ fi
 
 echo "Waiting for the Kafka-Cluster to become ready..."
 docker-compose exec cli cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1
-docker-compose up -d kafka-ui
-
+docker-compose up setup
+docker-compose up -d producer consumer
+sleep 5
 docker-compose exec -T cli bash << 'EOF'
-echo "Creating topic with 3 partitions..."
-kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic test
-# tag::createtopic[]
-kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 3
-# end::createtopic[]
-kafka-topics --bootstrap-server kafka:9092 --describe --topic test
+echo "Writing poison pill into topic test..."
+# tag::poisonpill[]
+echo 'BOOM!' | kafkacat -P -b kafka:9092 -t test
+# end::poisonpill[]
 EOF
-
-docker-compose up -d consumer
-
-docker-compose up -d producer
-sleep 10
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-
+while [[ $(http 0:8081/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Consumer is still running..."; sleep 1; done
+http -v :8081/actuator/health
+echo "Restarting consumer"
+http -v post :8081/start
+while ! [[ $(http 0:8081/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for consumer..."; sleep 1; done
+while [[ $(http 0:8081/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Consumer is still running..."; sleep 1; done
+http -v :8081/actuator/health
+http -v post :8081/actuator/shutdown
 docker-compose stop producer
-docker-compose exec -T cli bash << 'EOF'
-echo "Altering number of partitions from 3 to 7..."
-# tag::altertopic[]
-kafka-topics --bootstrap-server kafka:9092 --alter --topic test --partitions 7
-kafka-topics --bootstrap-server kafka:9092 --describe --topic test
-# end::altertopic[]
-EOF
-
-docker-compose start producer
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-docker-compose stop producer consumer
+docker-compose ps
+docker-compose logs --tail=100 consumer
index 1392ae2..b03bb1e 100644 (file)
@@ -24,13 +24,13 @@ services:
     depends_on:
       - zookeeper
 
-  kafka-ui:
-    image: provectuslabs/kafka-ui:0.3.3
-    ports:
-      - 8080:8080
-    environment:
-      KAFKA_CLUSTERS_0_NAME: local
-      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
+  setup:
+    image: juplo/toolbox
+    command: >
+      bash -c "
+        kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic test
+        kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 2
+      "
 
   cli:
     image: juplo/toolbox
@@ -44,7 +44,7 @@ services:
       producer.bootstrap-server: kafka:9092
       producer.client-id: producer
       producer.topic: test
-      producer.throttle-ms: 10
+      producer.throttle-ms: 200
 
 
   consumer:
index c5fca91..4054e93 100644 (file)
@@ -19,7 +19,7 @@ import java.util.function.Consumer;
 public class ApplicationConfiguration
 {
   @Bean
-  public Consumer<ConsumerRecord<String, String>> consumer()
+  public Consumer<ConsumerRecord<String, Long>> consumer()
   {
     return (record) ->
     {
@@ -28,10 +28,10 @@ public class ApplicationConfiguration
   }
 
   @Bean
-  public EndlessConsumer<String, String> endlessConsumer(
-      KafkaConsumer<String, String> kafkaConsumer,
+  public EndlessConsumer<String, Long> endlessConsumer(
+      KafkaConsumer<String, Long> kafkaConsumer,
       ExecutorService executor,
-      Consumer<ConsumerRecord<String, String>> handler,
+      Consumer<ConsumerRecord<String, Long>> handler,
       ApplicationProperties properties)
   {
     return
@@ -50,7 +50,7 @@ public class ApplicationConfiguration
   }
 
   @Bean(destroyMethod = "close")
-  public KafkaConsumer<String, String> kafkaConsumer(ApplicationProperties properties)
+  public KafkaConsumer<String, Long> kafkaConsumer(ApplicationProperties properties)
   {
     Properties props = new Properties();