From: Kai Moritz Date: Sun, 13 Apr 2025 09:12:38 +0000 (+0200) Subject: Handling der Nachricht in das Interface `RecordHandler` verlegt X-Git-Tag: consumer/spring-consumer--record-handler--2025-04-signal-spickzettel X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=refs%2Fheads%2Fconsumer%2Fspring-consumer--record-handler;p=demos%2Fkafka%2Ftraining Handling der Nachricht in das Interface `RecordHandler` verlegt --- diff --git a/README.sh b/README.sh index a7cbb894..203c22bd 100755 --- a/README.sh +++ b/README.sh @@ -1,6 +1,6 @@ #!/bin/bash -IMAGE=juplo/spring-consumer:1.1-generics-SNAPSHOT +IMAGE=juplo/spring-consumer:1.1-record-handler-SNAPSHOT if [ "$1" = "cleanup" ] then diff --git a/build.gradle b/build.gradle index 7e70329c..f700918b 100644 --- a/build.gradle +++ b/build.gradle @@ -8,7 +8,7 @@ plugins { } group = 'de.juplo.kafka' -version = '1.1-generics-SNAPSHOT' +version = '1.1-record-handler-SNAPSHOT' java { toolchain { diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index e5b9a9fe..bf306f68 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -140,7 +140,7 @@ services: command: kafka:9092 test producer consumer: - image: juplo/spring-consumer:1.1-generics-SNAPSHOT + image: juplo/spring-consumer:1.1-record-handler-SNAPSHOT environment: spring.kafka.bootstrap-servers: kafka:9092 spring.kafka.client-id: consumer @@ -149,7 +149,7 @@ services: juplo.consumer.topic: test peter: - image: juplo/spring-consumer:1.1-generics-SNAPSHOT + image: juplo/spring-consumer:1.1-record-handler-SNAPSHOT environment: spring.kafka.bootstrap-servers: kafka:9092 spring.kafka.client-id: consumer @@ -158,7 +158,7 @@ services: juplo.consumer.topic: test ute: - image: juplo/spring-consumer:1.1-generics-SNAPSHOT + image: juplo/spring-consumer:1.1-record-handler-SNAPSHOT environment: spring.kafka.bootstrap-servers: kafka:9092 spring.kafka.client-id: consumer diff --git a/pom.xml b/pom.xml index 250f40d9..7590d25a 100644 --- a/pom.xml +++ b/pom.xml @@ -15,7 +15,7 @@ spring-consumer Spring Consumer Super Simple Consumer-Group, that is implemented as Spring-Boot application and configured by Spring Kafka - 1.1-generics-SNAPSHOT + 1.1-record-handler-SNAPSHOT 21 diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index e523a203..87331b34 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -1,5 +1,6 @@ package de.juplo.kafka; +import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; @@ -11,11 +12,13 @@ import org.springframework.kafka.core.ConsumerFactory; @Configuration @EnableConfigurationProperties(ApplicationProperties.class) +@Slf4j public class ApplicationConfiguration { @Bean public ExampleConsumer exampleConsumer( Consumer kafkaConsumer, + RecordHandler recordHandler, ApplicationProperties properties, KafkaProperties kafkaProperties, ConfigurableApplicationContext applicationContext) @@ -25,9 +28,16 @@ public class ApplicationConfiguration kafkaProperties.getClientId(), properties.getConsumerProperties().getTopic(), kafkaConsumer, + recordHandler, () -> applicationContext.close()); } + @Bean + public RecordHandler recordHandler() + { + return (topic, partition, offset, key, value) -> log.info("No-Ops Handler called for {}={}", key, value); + } + @Bean(destroyMethod = "") public Consumer kafkaConsumer(ConsumerFactory consumerFactory) { diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index 101abd1c..e5a8b3d5 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -16,6 +16,7 @@ public class ExampleConsumer implements Runnable private final String id; private final String topic; private final Consumer consumer; + private final RecordHandler recordHandler; private final Thread workerThread; private final Runnable closeCallback; @@ -26,11 +27,13 @@ public class ExampleConsumer implements Runnable String clientId, String topic, Consumer consumer, + RecordHandler recordHandler, Runnable closeCallback) { this.id = clientId; this.topic = topic; this.consumer = consumer; + this.recordHandler = recordHandler; workerThread = new Thread(this, "ExampleConsumer Worker-Thread"); workerThread.start(); @@ -91,6 +94,7 @@ public class ExampleConsumer implements Runnable { consumed++; log.info("{} - partition={}-{}, offset={}: {}={}", id, topic, partition, offset, key, value); + recordHandler.handleRecord(topic, partition, offset, key, value); } diff --git a/src/main/java/de/juplo/kafka/RecordHandler.java b/src/main/java/de/juplo/kafka/RecordHandler.java new file mode 100644 index 00000000..a7b65af2 --- /dev/null +++ b/src/main/java/de/juplo/kafka/RecordHandler.java @@ -0,0 +1,11 @@ +package de.juplo.kafka; + +public interface RecordHandler +{ + void handleRecord( + String topic, + Integer partition, + Long offset, + K key, + V value); +}