From 87dfbae7121737c4fe1b7b996b628f5c3a8b0bdb Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 13 Nov 2022 22:14:00 +0100 Subject: [PATCH] Vorlage --- README.sh | 25 ++++--------------- docker-compose.yml | 22 ---------------- pom.xml | 4 --- .../juplo/kafka/ApplicationConfiguration.java | 2 +- .../java/de/juplo/kafka/SimpleConsumer.java | 6 ++--- src/main/resources/application.yml | 6 ++--- 6 files changed, 11 insertions(+), 54 deletions(-) diff --git a/README.sh b/README.sh index e61188f..4f4e3ea 100755 --- a/README.sh +++ b/README.sh @@ -10,7 +10,6 @@ then fi docker-compose up -d kafka-1 kafka-2 kafka-3 cli -docker-compose rm -svf consumer-1 consumer-2 if [[ $(docker image ls -q $IMAGE) == "" || @@ -23,26 +22,12 @@ else docker image ls $IMAGE fi -docker-compose up -d -docker-compose logs -f setup +docker-compose up setup +docker-compose ps +docker-compose up -d producer while ! [[ $(http 0:8080/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer..."; sleep 1; done -while ! [[ $(http 0:8081/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for consumer-1..."; sleep 1; done -while ! [[ $(http 0:8082/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for consumer-2..."; sleep 1; done -# tag::nachrichten[] -echo 6 | http -v :8080/peter -echo 77 | http -v :8080/klaus -# end::nachrichten[] +echo -n 3 | http -v :8080/foo; -echo "Writing poison pill..." -# tag::poisonpill[] -echo 'BOOM!' | kafkacat -P -b :9092 -t test -# end::poisonpill[] - -docker-compose logs -f consumer-1 consumer-2 - -echo "Restarting consumer-1..." -# tag::restart[] -docker-compose up consumer-1 -# end::restart[] +kafkacat -b :9092 -t test -o 0 -e -f 'p=%p|o=%o|k=%k|h=%h|v=%s\n' diff --git a/docker-compose.yml b/docker-compose.yml index 7324b30..b47f679 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -132,25 +132,3 @@ services: producer.bootstrap-server: kafka:9092 producer.client-id: producer producer.topic: test - - consumer-1: - image: juplo/spring-consumer-json:1.0-SNAPSHOT - ports: - - 8081:8080 - environment: - server.port: 8080 - spring.kafka.bootstrap-servers: kafka:9092 - spring.kafka.client-id: consumer-1 - spring.kafka.consumer.group-id: my-group - simple.consumer.topic: test - - consumer-2: - image: juplo/spring-consumer-json:1.0-SNAPSHOT - ports: - - 8082:8080 - environment: - server.port: 8080 - spring.kafka.bootstrap-servers: kafka:9092 - spring.kafka.client-id: consumer-2 - spring.kafka.consumer.group-id: my-group - simple.consumer.topic: test diff --git a/pom.xml b/pom.xml index 1165685..e474870 100644 --- a/pom.xml +++ b/pom.xml @@ -61,10 +61,6 @@ - - pl.project13.maven - git-commit-id-plugin - org.springframework.boot spring-boot-maven-plugin diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index a8b3e1d..62d61a2 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -15,7 +15,7 @@ public class ApplicationConfiguration { @Bean public SimpleConsumer simpleConsumer( - Consumer kafkaConsumer, + Consumer kafkaConsumer, KafkaProperties kafkaProperties, ApplicationProperties applicationProperties) { diff --git a/src/main/java/de/juplo/kafka/SimpleConsumer.java b/src/main/java/de/juplo/kafka/SimpleConsumer.java index 45f9b94..8e557c2 100644 --- a/src/main/java/de/juplo/kafka/SimpleConsumer.java +++ b/src/main/java/de/juplo/kafka/SimpleConsumer.java @@ -18,7 +18,7 @@ public class SimpleConsumer implements Callable { private final String id; private final String topic; - private final Consumer consumer; + private final Consumer consumer; private long consumed = 0; @@ -33,11 +33,11 @@ public class SimpleConsumer implements Callable while (true) { - ConsumerRecords records = + ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); log.info("{} - Received {} messages", id, records.count()); - for (ConsumerRecord record : records) + for (ConsumerRecord record : records) { handleRecord( record.topic(), diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 07d0625..c62240e 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -30,13 +30,11 @@ spring: auto-offset-reset: earliest auto-commit-interval: 5s key-deserializer: org.apache.kafka.common.serialization.StringDeserializer - value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer + value-deserializer: org.apache.kafka.common.serialization.StringDeserializer properties: partition.assignment.strategy: org.apache.kafka.clients.consumer.StickyAssignor metadata.max.age.ms: 1000 - spring.json.type.mapping: > - ADD:de.juplo.kafka.MessageAddNumber, - CALC:de.juplo.kafka.MessageCalculateSum + spring.json.type.mapping: TODO logging: level: root: INFO -- 2.20.1