From b9b4f03cc4569888a668f889fbdea220fafec2f1 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Tue, 12 Nov 2024 03:24:38 +0100 Subject: [PATCH] =?utf8?q?Vorlage=20f=C3=BCr=20die=20=C3=9Cbung=20zum=20`J?= =?utf8?q?sonDeserializer`?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- README.sh | 39 ------------------- .../juplo/kafka/ApplicationConfiguration.java | 8 ++-- .../java/de/juplo/kafka/ExampleConsumer.java | 10 ++--- 3 files changed, 8 insertions(+), 49 deletions(-) delete mode 100755 README.sh diff --git a/README.sh b/README.sh deleted file mode 100755 index 7152ec9..0000000 --- a/README.sh +++ /dev/null @@ -1,39 +0,0 @@ -#!/bin/bash - -IMAGE=juplo/spring-consumer:1.1-json-SNAPSHOT - -if [ "$1" = "cleanup" ] -then - docker compose -f docker/docker-compose.yml down -t0 -v --remove-orphans - mvn clean - exit -fi - -docker compose -f docker/docker-compose.yml up -d --remove-orphans kafka-1 kafka-2 kafka-3 -docker compose -f docker/docker-compose.yml rm -svf consumer - -if [[ - $(docker image ls -q $IMAGE) == "" || - "$1" = "build" -]] -then - mvn clean install || exit -else - echo "Using image existing images:" - docker image ls $IMAGE -fi - -docker compose -f docker/docker-compose.yml up --remove-orphans setup || exit 1 - - -docker compose -f docker/docker-compose.yml up -d producer -docker compose -f docker/docker-compose.yml up -d consumer - -sleep 5 -docker compose -f docker/docker-compose.yml stop consumer - -docker compose -f docker/docker-compose.yml start consumer -sleep 5 - -docker compose -f docker/docker-compose.yml stop producer consumer -docker compose -f docker/docker-compose.yml logs consumer diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 3f7422c..a4856a6 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -4,7 +4,6 @@ import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.StickyAssignor; import org.apache.kafka.common.serialization.StringDeserializer; -import org.springframework.kafka.support.serializer.JsonDeserializer; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; @@ -19,7 +18,7 @@ public class ApplicationConfiguration { @Bean public ExampleConsumer exampleConsumer( - Consumer kafkaConsumer, + Consumer kafkaConsumer, ApplicationProperties properties, ConfigurableApplicationContext applicationContext) { @@ -32,7 +31,7 @@ public class ApplicationConfiguration } @Bean(destroyMethod = "") - public KafkaConsumer kafkaConsumer(ApplicationProperties properties) + public KafkaConsumer kafkaConsumer(ApplicationProperties properties) { Properties props = new Properties(); props.put("bootstrap.servers", properties.getBootstrapServer()); @@ -49,8 +48,7 @@ public class ApplicationConfiguration props.put("metadata.maxage.ms", 5000); // 5 Sekunden props.put("partition.assignment.strategy", StickyAssignor.class.getName()); props.put("key.deserializer", StringDeserializer.class.getName()); - props.put("value.deserializer", JsonDeserializer.class.getName()); - props.put("spring.json.type.mapping", "ADD:de.juplo.kafka.MessageAddNumber,CALC:de.juplo.kafka.MessageCalculateSum"); + props.put("value.deserializer", StringDeserializer.class.getName()); return new KafkaConsumer<>(props); } diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index 654ef37..f832b45 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -15,7 +15,7 @@ public class ExampleConsumer implements Runnable { private final String id; private final String topic; - private final Consumer consumer; + private final Consumer consumer; private final Thread workerThread; private final Runnable closeCallback; @@ -26,7 +26,7 @@ public class ExampleConsumer implements Runnable public ExampleConsumer( String clientId, String topic, - Consumer consumer, + Consumer consumer, Runnable closeCallback) { this.id = clientId; @@ -51,11 +51,11 @@ public class ExampleConsumer implements Runnable while (running) { - ConsumerRecords records = + ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); log.info("{} - Received {} messages", id, records.count()); - for (ConsumerRecord record : records) + for (ConsumerRecord record : records) { handleRecord( record.topic(), @@ -90,7 +90,7 @@ public class ExampleConsumer implements Runnable Integer partition, Long offset, String key, - Message value) + String value) { consumed++; log.info("{} - {}: {}/{} - {}={}", id, offset, topic, partition, key, value); -- 2.20.1