From d95119d9b40c397c5a9d87855791671a62f9eb3a Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 13 Nov 2022 22:14:00 +0100 Subject: [PATCH] Vorlage --- README.sh | 15 ++------------- .../de/juplo/kafka/ApplicationConfiguration.java | 2 +- src/main/java/de/juplo/kafka/SimpleConsumer.java | 8 ++++---- src/main/resources/application.yml | 6 ++---- 4 files changed, 9 insertions(+), 22 deletions(-) diff --git a/README.sh b/README.sh index a5a4774..a6c932e 100755 --- a/README.sh +++ b/README.sh @@ -34,19 +34,8 @@ while ! [[ $(http 0:8080/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Wait while ! [[ $(http 0:8081/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for consumer-1..."; sleep 1; done while ! [[ $(http 0:8082/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for consumer-2..."; sleep 1; done -# tag::nachrichten[] echo 6 | http -v :8080/peter echo 77 | http -v :8080/klaus -# end::nachrichten[] -echo "Writing poison pill..." -# tag::poisonpill[] -echo 'BOOM!' | kafkacat -P -b :9092 -t test -# end::poisonpill[] - -docker-compose -f docker/docker-compose.yml logs -f consumer-1 consumer-2 - -echo "Restarting consumer-1..." -# tag::restart[] -docker-compose -f docker/docker-compose.yml up consumer-1 -# end::restart[] +sleep 5 +docker-compose -f docker/docker-compose.yml logs consumer-1 consumer-2 diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index a8b3e1d..62d61a2 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -15,7 +15,7 @@ public class ApplicationConfiguration { @Bean public SimpleConsumer simpleConsumer( - Consumer kafkaConsumer, + Consumer kafkaConsumer, KafkaProperties kafkaProperties, ApplicationProperties applicationProperties) { diff --git a/src/main/java/de/juplo/kafka/SimpleConsumer.java b/src/main/java/de/juplo/kafka/SimpleConsumer.java index 45f9b94..bd754fc 100644 --- a/src/main/java/de/juplo/kafka/SimpleConsumer.java +++ b/src/main/java/de/juplo/kafka/SimpleConsumer.java @@ -18,7 +18,7 @@ public class SimpleConsumer implements Callable { private final String id; private final String topic; - private final Consumer consumer; + private final Consumer consumer; private long consumed = 0; @@ -33,11 +33,11 @@ public class SimpleConsumer implements Callable while (true) { - ConsumerRecords records = + ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); log.info("{} - Received {} messages", id, records.count()); - for (ConsumerRecord record : records) + for (ConsumerRecord record : records) { handleRecord( record.topic(), @@ -72,7 +72,7 @@ public class SimpleConsumer implements Callable Integer partition, Long offset, String key, - Message value) + Object value) { consumed++; log.info("{} - {}: {}/{} - {}={}", id, offset, topic, partition, key, value); diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 07d0625..c62240e 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -30,13 +30,11 @@ spring: auto-offset-reset: earliest auto-commit-interval: 5s key-deserializer: org.apache.kafka.common.serialization.StringDeserializer - value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer + value-deserializer: org.apache.kafka.common.serialization.StringDeserializer properties: partition.assignment.strategy: org.apache.kafka.clients.consumer.StickyAssignor metadata.max.age.ms: 1000 - spring.json.type.mapping: > - ADD:de.juplo.kafka.MessageAddNumber, - CALC:de.juplo.kafka.MessageCalculateSum + spring.json.type.mapping: TODO logging: level: root: INFO -- 2.20.1