From e0d3872018d67445881d1da2d7fbfed3909d0649 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 13 Nov 2022 22:14:00 +0100 Subject: [PATCH] Vorlage --- README.sh | 6 ----- docker-compose.yml | 22 ------------------- pom.xml | 4 ---- .../juplo/kafka/ApplicationConfiguration.java | 2 +- .../java/de/juplo/kafka/SimpleConsumer.java | 6 ++--- src/main/resources/application.yml | 6 ++--- 6 files changed, 6 insertions(+), 40 deletions(-) diff --git a/README.sh b/README.sh index e61188f..07deaa4 100755 --- a/README.sh +++ b/README.sh @@ -30,19 +30,13 @@ while ! [[ $(http 0:8080/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Wait while ! [[ $(http 0:8081/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for consumer-1..."; sleep 1; done while ! [[ $(http 0:8082/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for consumer-2..."; sleep 1; done -# tag::nachrichten[] echo 6 | http -v :8080/peter echo 77 | http -v :8080/klaus -# end::nachrichten[] echo "Writing poison pill..." -# tag::poisonpill[] echo 'BOOM!' | kafkacat -P -b :9092 -t test -# end::poisonpill[] docker-compose logs -f consumer-1 consumer-2 echo "Restarting consumer-1..." -# tag::restart[] docker-compose up consumer-1 -# end::restart[] diff --git a/docker-compose.yml b/docker-compose.yml index b0260ab..263f569 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -115,25 +115,3 @@ services: producer.bootstrap-server: kafka:9092 producer.client-id: producer producer.topic: test - - consumer-1: - image: juplo/spring-consumer-json:1.0-SNAPSHOT - ports: - - 8081:8080 - environment: - server.port: 8080 - spring.kafka.bootstrap-servers: kafka:9092 - spring.kafka.client-id: consumer-1 - spring.kafka.consumer.group-id: my-group - simple.consumer.topic: test - - consumer-2: - image: juplo/spring-consumer-json:1.0-SNAPSHOT - ports: - - 8082:8080 - environment: - server.port: 8080 - spring.kafka.bootstrap-servers: kafka:9092 - spring.kafka.client-id: consumer-2 - spring.kafka.consumer.group-id: my-group - simple.consumer.topic: test diff --git a/pom.xml b/pom.xml index 1165685..e474870 100644 --- a/pom.xml +++ b/pom.xml @@ -61,10 +61,6 @@ - - pl.project13.maven - git-commit-id-plugin - org.springframework.boot spring-boot-maven-plugin diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index a8b3e1d..62d61a2 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -15,7 +15,7 @@ public class ApplicationConfiguration { @Bean public SimpleConsumer simpleConsumer( - Consumer kafkaConsumer, + Consumer kafkaConsumer, KafkaProperties kafkaProperties, ApplicationProperties applicationProperties) { diff --git a/src/main/java/de/juplo/kafka/SimpleConsumer.java b/src/main/java/de/juplo/kafka/SimpleConsumer.java index 45f9b94..8e557c2 100644 --- a/src/main/java/de/juplo/kafka/SimpleConsumer.java +++ b/src/main/java/de/juplo/kafka/SimpleConsumer.java @@ -18,7 +18,7 @@ public class SimpleConsumer implements Callable { private final String id; private final String topic; - private final Consumer consumer; + private final Consumer consumer; private long consumed = 0; @@ -33,11 +33,11 @@ public class SimpleConsumer implements Callable while (true) { - ConsumerRecords records = + ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); log.info("{} - Received {} messages", id, records.count()); - for (ConsumerRecord record : records) + for (ConsumerRecord record : records) { handleRecord( record.topic(), diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 07d0625..c62240e 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -30,13 +30,11 @@ spring: auto-offset-reset: earliest auto-commit-interval: 5s key-deserializer: org.apache.kafka.common.serialization.StringDeserializer - value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer + value-deserializer: org.apache.kafka.common.serialization.StringDeserializer properties: partition.assignment.strategy: org.apache.kafka.clients.consumer.StickyAssignor metadata.max.age.ms: 1000 - spring.json.type.mapping: > - ADD:de.juplo.kafka.MessageAddNumber, - CALC:de.juplo.kafka.MessageCalculateSum + spring.json.type.mapping: TODO logging: level: root: INFO -- 2.20.1