From: Kai Moritz Date: Sat, 9 Apr 2022 16:30:22 +0000 (+0200) Subject: Demonstration der RecordDeserializationException X-Git-Tag: deserialization-synchroner-test~6 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=a1e7b0e201440216ba91a94aac0f4f0b3ed3d143;p=demos%2Fkafka%2Ftraining Demonstration der RecordDeserializationException * EndlessConsumer wird mit dem Value-Type `Long` anstatt `String` erzeugt * Setup für die Demonstration der DeserializationException überarbeitet --- diff --git a/README.sh b/README.sh index 13176d2..72f0c60 100755 --- a/README.sh +++ b/README.sh @@ -24,65 +24,23 @@ fi echo "Waiting for the Kafka-Cluster to become ready..." docker-compose exec cli cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1 -docker-compose up -d kafka-ui - +docker-compose up setup +docker-compose up -d producer consumer +sleep 5 docker-compose exec -T cli bash << 'EOF' -echo "Creating topic with 3 partitions..." -kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic test -# tag::createtopic[] -kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 3 -# end::createtopic[] -kafka-topics --bootstrap-server kafka:9092 --describe --topic test +echo "Writing poison pill into topic test..." +# tag::poisonpill[] +echo 'BOOM!' | kafkacat -P -b kafka:9092 -t test +# end::poisonpill[] EOF - -docker-compose up -d consumer - -docker-compose up -d producer -sleep 10 -http -v :8081/seen -sleep 1 -http -v :8081/seen -sleep 1 -http -v :8081/seen -sleep 1 -http -v :8081/seen - +while [[ $(http 0:8081/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Consumer is still running..."; sleep 1; done +http -v :8081/actuator/health +echo "Restarting consumer" +http -v post :8081/start +while ! [[ $(http 0:8081/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for consumer..."; sleep 1; done +while [[ $(http 0:8081/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Consumer is still running..."; sleep 1; done +http -v :8081/actuator/health +http -v post :8081/actuator/shutdown docker-compose stop producer -docker-compose exec -T cli bash << 'EOF' -echo "Altering number of partitions from 3 to 7..." -# tag::altertopic[] -kafka-topics --bootstrap-server kafka:9092 --alter --topic test --partitions 7 -kafka-topics --bootstrap-server kafka:9092 --describe --topic test -# end::altertopic[] -EOF - -docker-compose start producer -sleep 1 -http -v :8081/seen -sleep 1 -http -v :8081/seen -sleep 1 -http -v :8081/seen -sleep 1 -http -v :8081/seen -sleep 1 -http -v :8081/seen -sleep 1 -http -v :8081/seen -sleep 1 -http -v :8081/seen -sleep 1 -http -v :8081/seen -sleep 1 -http -v :8081/seen -sleep 1 -http -v :8081/seen -sleep 1 -http -v :8081/seen -sleep 1 -http -v :8081/seen -sleep 1 -http -v :8081/seen -sleep 1 -http -v :8081/seen -docker-compose stop producer consumer +docker-compose ps +docker-compose logs --tail=100 consumer diff --git a/docker-compose.yml b/docker-compose.yml index 1392ae2..b03bb1e 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -24,13 +24,13 @@ services: depends_on: - zookeeper - kafka-ui: - image: provectuslabs/kafka-ui:0.3.3 - ports: - - 8080:8080 - environment: - KAFKA_CLUSTERS_0_NAME: local - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092 + setup: + image: juplo/toolbox + command: > + bash -c " + kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic test + kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 2 + " cli: image: juplo/toolbox @@ -44,7 +44,7 @@ services: producer.bootstrap-server: kafka:9092 producer.client-id: producer producer.topic: test - producer.throttle-ms: 10 + producer.throttle-ms: 200 consumer: diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index c5fca91..4054e93 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -19,7 +19,7 @@ import java.util.function.Consumer; public class ApplicationConfiguration { @Bean - public Consumer> consumer() + public Consumer> consumer() { return (record) -> { @@ -28,10 +28,10 @@ public class ApplicationConfiguration } @Bean - public EndlessConsumer endlessConsumer( - KafkaConsumer kafkaConsumer, + public EndlessConsumer endlessConsumer( + KafkaConsumer kafkaConsumer, ExecutorService executor, - Consumer> handler, + Consumer> handler, ApplicationProperties properties) { return @@ -50,7 +50,7 @@ public class ApplicationConfiguration } @Bean(destroyMethod = "close") - public KafkaConsumer kafkaConsumer(ApplicationProperties properties) + public KafkaConsumer kafkaConsumer(ApplicationProperties properties) { Properties props = new Properties();