From b845714b4a598f7cfb3c6c5ae99112f725b2bcaf Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 10 Nov 2024 17:30:57 +0100 Subject: [PATCH] Version des ``spring-consumer``, die ``long``-Werte erwartet --- README.sh | 21 +++++++++++-------- build.gradle | 2 +- docker/docker-compose.yml | 16 +++++++------- pom.xml | 2 +- .../juplo/kafka/ApplicationConfiguration.java | 7 ++++--- .../java/de/juplo/kafka/ExampleConsumer.java | 10 ++++----- 6 files changed, 31 insertions(+), 27 deletions(-) diff --git a/README.sh b/README.sh index b46e2350..86b98061 100755 --- a/README.sh +++ b/README.sh @@ -1,6 +1,6 @@ #!/bin/bash -IMAGE=juplo/spring-consumer:1.1-SNAPSHOT +IMAGE=juplo/spring-consumer:1.1-long-SNAPSHOT if [ "$1" = "cleanup" ] then @@ -10,7 +10,7 @@ then fi docker compose -f docker/docker-compose.yml up -d --remove-orphans kafka-1 kafka-2 kafka-3 -docker compose -f docker/docker-compose.yml rm -svf consumer +docker compose -f docker/docker-compose.yml rm -svf peter ute if [[ $(docker image ls -q $IMAGE) == "" || @@ -27,13 +27,16 @@ docker compose -f docker/docker-compose.yml up --remove-orphans setup || exit 1 docker compose -f docker/docker-compose.yml up -d producer -docker compose -f docker/docker-compose.yml up -d consumer +docker compose -f docker/docker-compose.yml up -d peter ute +sleep 15 -sleep 5 -docker compose -f docker/docker-compose.yml stop consumer +docker compose -f docker/docker-compose.yml stop producer -docker compose -f docker/docker-compose.yml start consumer -sleep 5 +echo +echo "Von peter empfangen:" +docker compose -f docker/docker-compose.yml logs peter | grep '\ test\/.' +echo +echo "Von ute empfangen:" +docker compose -f docker/docker-compose.yml logs ute | grep '\ test\/.' -docker compose -f docker/docker-compose.yml stop producer consumer -docker compose -f docker/docker-compose.yml logs consumer +docker compose -f docker/docker-compose.yml stop peter ute diff --git a/build.gradle b/build.gradle index a8614fdf..12188d65 100644 --- a/build.gradle +++ b/build.gradle @@ -8,7 +8,7 @@ plugins { } group = 'de.juplo.kafka' -version = '1.1-SNAPSHOT' +version = '1.1-long-SNAPSHOT' java { toolchain { diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 9984f6b8..3338c62a 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -136,29 +136,29 @@ services: - kafka-3 producer: - image: juplo/simple-producer:1.0-SNAPSHOT - command: kafka:9092 test producer + image: juplo/spring-producer:2.0-long-SNAPSHOT + environment: + spring.kafka.bootstrap-servers: kafka:9092 + spring.kafka.client-id: producer + juplo.producer.topic: test consumer: - image: juplo/spring-consumer:1.1-SNAPSHOT + image: juplo/spring-consumer:1.1-long-SNAPSHOT environment: juplo.bootstrap-server: kafka:9092 juplo.client-id: consumer - juplo.consumer.topic: test peter: - image: juplo/spring-consumer:1.1-SNAPSHOT + image: juplo/spring-consumer:1.1-long-SNAPSHOT environment: juplo.bootstrap-server: kafka:9092 juplo.client-id: peter - juplo.consumer.topic: test ute: - image: juplo/spring-consumer:1.1-SNAPSHOT + image: juplo/spring-consumer:1.1-long-SNAPSHOT environment: juplo.bootstrap-server: kafka:9092 juplo.client-id: ute - juplo.consumer.topic: test volumes: zookeeper-data: diff --git a/pom.xml b/pom.xml index dd96d00f..d99ed82f 100644 --- a/pom.xml +++ b/pom.xml @@ -15,7 +15,7 @@ spring-consumer Spring Consumer Super Simple Consumer-Group, that is implemented as Spring-Boot application and configured by Spring Kafka - 1.1-SNAPSHOT + 1.1-long-SNAPSHOT 21 diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index ea6b64ea..cb2057b8 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -2,6 +2,7 @@ package de.juplo.kafka; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.ConfigurableApplicationContext; @@ -17,7 +18,7 @@ public class ApplicationConfiguration { @Bean public ExampleConsumer exampleConsumer( - Consumer kafkaConsumer, + Consumer kafkaConsumer, ApplicationProperties properties, ConfigurableApplicationContext applicationContext) { @@ -30,7 +31,7 @@ public class ApplicationConfiguration } @Bean(destroyMethod = "") - public KafkaConsumer kafkaConsumer(ApplicationProperties properties) + public KafkaConsumer kafkaConsumer(ApplicationProperties properties) { Properties props = new Properties(); props.put("bootstrap.servers", properties.getBootstrapServer()); @@ -46,7 +47,7 @@ public class ApplicationConfiguration } props.put("metadata.max.age.ms", 5000); // 5 Sekunden props.put("key.deserializer", StringDeserializer.class.getName()); - props.put("value.deserializer", StringDeserializer.class.getName()); + props.put("value.deserializer", LongDeserializer.class.getName()); return new KafkaConsumer<>(props); } diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index 1f5a5706..83a9f89b 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -15,7 +15,7 @@ public class ExampleConsumer implements Runnable { private final String id; private final String topic; - private final Consumer consumer; + private final Consumer consumer; private final Thread workerThread; private final Runnable closeCallback; @@ -25,7 +25,7 @@ public class ExampleConsumer implements Runnable public ExampleConsumer( String clientId, String topic, - Consumer consumer, + Consumer consumer, Runnable closeCallback) { this.id = clientId; @@ -49,10 +49,10 @@ public class ExampleConsumer implements Runnable while (true) { - ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); + ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); log.info("{} - Received {} messages", id, records.count()); - for (ConsumerRecord record : records) + for (ConsumerRecord record : records) { handleRecord( record.topic(), @@ -87,7 +87,7 @@ public class ExampleConsumer implements Runnable Integer partition, Long offset, String key, - String value) + Long value) { consumed++; log.info("{} - partition={}-{}, offset={}: {}={}", id, topic, partition, offset, key, value); -- 2.20.1