From 84df469189ed6b83cac0eaee1684a69d55bb1bd4 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 10 Nov 2024 17:30:57 +0100 Subject: [PATCH] Version des ``spring-consumer``, die ``long``-Werte erwartet --- README.sh | 21 +++++++++++-------- docker/docker-compose.yml | 17 +++++++++------ pom.xml | 2 +- .../juplo/kafka/ApplicationConfiguration.java | 7 ++++--- .../java/de/juplo/kafka/ExampleConsumer.java | 10 ++++----- 5 files changed, 33 insertions(+), 24 deletions(-) diff --git a/README.sh b/README.sh index b46e235..f4696e0 100755 --- a/README.sh +++ b/README.sh @@ -1,6 +1,6 @@ #!/bin/bash -IMAGE=juplo/spring-consumer:1.1-SNAPSHOT +IMAGE=juplo/spring-consumer:1.1-long-SNAPSHOT if [ "$1" = "cleanup" ] then @@ -10,7 +10,7 @@ then fi docker compose -f docker/docker-compose.yml up -d --remove-orphans kafka-1 kafka-2 kafka-3 -docker compose -f docker/docker-compose.yml rm -svf consumer +docker compose -f docker/docker-compose.yml rm -svf consumer-1 consumer-2 if [[ $(docker image ls -q $IMAGE) == "" || @@ -27,13 +27,16 @@ docker compose -f docker/docker-compose.yml up --remove-orphans setup || exit 1 docker compose -f docker/docker-compose.yml up -d producer -docker compose -f docker/docker-compose.yml up -d consumer +docker compose -f docker/docker-compose.yml up -d consumer-1 consumer-2 +sleep 15 -sleep 5 -docker compose -f docker/docker-compose.yml stop consumer +docker compose -f docker/docker-compose.yml stop producer -docker compose -f docker/docker-compose.yml start consumer -sleep 5 +echo +echo "Von consumer-1 empfangen:" +docker compose -f docker/docker-compose.yml logs consumer-1 | grep '\ test\/.' +echo +echo "Von consumer-2 empfangen:" +docker compose -f docker/docker-compose.yml logs consumer-2 | grep '\ test\/.' -docker compose -f docker/docker-compose.yml stop producer consumer -docker compose -f docker/docker-compose.yml logs consumer +docker compose -f docker/docker-compose.yml stop consumer-1 consumer-2 diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 6bd2766..b899570 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -190,19 +190,24 @@ services: - kafka-3 producer: - image: juplo/spring-producer:1.0-SNAPSHOT + image: juplo/spring-producer:1.0-long-SNAPSHOT environment: juplo.bootstrap-server: kafka:9092 juplo.client-id: producer juplo.producer.topic: test - juplo.producer.linger-ms: 666 - juplo.producer.throttle-ms: 100 - consumer: - image: juplo/spring-consumer:1.1-SNAPSHOT + consumer-1: + image: juplo/spring-consumer:1.1-long-SNAPSHOT environment: juplo.bootstrap-server: kafka:9092 - juplo.client-id: consumer + juplo.client-id: consumer-1 + juplo.consumer.topic: test + + consumer-2: + image: juplo/spring-consumer:1.1-long-SNAPSHOT + environment: + juplo.bootstrap-server: kafka:9092 + juplo.client-id: consumer-2 juplo.consumer.topic: test volumes: diff --git a/pom.xml b/pom.xml index 98a0a36..c32791f 100644 --- a/pom.xml +++ b/pom.xml @@ -15,7 +15,7 @@ spring-consumer Spring Consumer Super Simple Consumer-Group, that is implemented as Spring-Boot application and configured by Spring Kafka - 1.1-SNAPSHOT + 1.1-long-SNAPSHOT 21 diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index a4856a6..37f0bf6 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -3,6 +3,7 @@ package de.juplo.kafka; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.StickyAssignor; +import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.ConfigurableApplicationContext; @@ -18,7 +19,7 @@ public class ApplicationConfiguration { @Bean public ExampleConsumer exampleConsumer( - Consumer kafkaConsumer, + Consumer kafkaConsumer, ApplicationProperties properties, ConfigurableApplicationContext applicationContext) { @@ -31,7 +32,7 @@ public class ApplicationConfiguration } @Bean(destroyMethod = "") - public KafkaConsumer kafkaConsumer(ApplicationProperties properties) + public KafkaConsumer kafkaConsumer(ApplicationProperties properties) { Properties props = new Properties(); props.put("bootstrap.servers", properties.getBootstrapServer()); @@ -48,7 +49,7 @@ public class ApplicationConfiguration props.put("metadata.maxage.ms", 5000); // 5 Sekunden props.put("partition.assignment.strategy", StickyAssignor.class.getName()); props.put("key.deserializer", StringDeserializer.class.getName()); - props.put("value.deserializer", StringDeserializer.class.getName()); + props.put("value.deserializer", LongDeserializer.class.getName()); return new KafkaConsumer<>(props); } diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index f832b45..faa4c0a 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -15,7 +15,7 @@ public class ExampleConsumer implements Runnable { private final String id; private final String topic; - private final Consumer consumer; + private final Consumer consumer; private final Thread workerThread; private final Runnable closeCallback; @@ -26,7 +26,7 @@ public class ExampleConsumer implements Runnable public ExampleConsumer( String clientId, String topic, - Consumer consumer, + Consumer consumer, Runnable closeCallback) { this.id = clientId; @@ -51,11 +51,11 @@ public class ExampleConsumer implements Runnable while (running) { - ConsumerRecords records = + ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); log.info("{} - Received {} messages", id, records.count()); - for (ConsumerRecord record : records) + for (ConsumerRecord record : records) { handleRecord( record.topic(), @@ -90,7 +90,7 @@ public class ExampleConsumer implements Runnable Integer partition, Long offset, String key, - String value) + Long value) { consumed++; log.info("{} - {}: {}/{} - {}={}", id, offset, topic, partition, key, value); -- 2.20.1