From a98ec2deda0a0b0cfc62e6ee7e9b9ab89db88b82 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Tue, 11 Feb 2025 22:35:27 +0100 Subject: [PATCH] Umbau auf `MessageConverter` (Nur eine Methode, zweite wird gefiltert) --- README.sh | 2 +- build.gradle | 2 +- docker/docker-compose.yml | 6 +-- pom.xml | 2 +- .../kafka/AddNumberRecordFilterStrategy.java | 52 +++++++++++++++++++ .../juplo/kafka/ApplicationConfiguration.java | 20 +++++++ .../java/de/juplo/kafka/ExampleConsumer.java | 7 ++- src/main/resources/application.yml | 4 +- 8 files changed, 82 insertions(+), 13 deletions(-) create mode 100644 src/main/java/de/juplo/kafka/AddNumberRecordFilterStrategy.java create mode 100644 src/main/java/de/juplo/kafka/ApplicationConfiguration.java diff --git a/README.sh b/README.sh index ca477732..bdeb8b22 100755 --- a/README.sh +++ b/README.sh @@ -1,6 +1,6 @@ #!/bin/bash -IMAGE=juplo/spring-consumer:1.1-kafkahandler-SNAPSHOT +IMAGE=juplo/spring-consumer:1.1-messageconverter-SNAPSHOT if [ "$1" = "cleanup" ] then diff --git a/build.gradle b/build.gradle index c771b9c0..79687238 100644 --- a/build.gradle +++ b/build.gradle @@ -8,7 +8,7 @@ plugins { } group = 'de.juplo.kafka' -version = '1.1-kafkahandler-SNAPSHOT' +version = '1.1-messageconverter-SNAPSHOT' java { toolchain { diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index bb71d1ca..47b856bd 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -145,7 +145,7 @@ services: juplo.producer.throttle-ms: 100 consumer: - image: juplo/spring-consumer:1.1-kafkahandler-SNAPSHOT + image: juplo/spring-consumer:1.1-messageconverter-SNAPSHOT environment: spring.kafka.bootstrap-servers: kafka:9092 spring.kafka.client-id: consumer @@ -154,7 +154,7 @@ services: juplo.consumer.topic: test peter: - image: juplo/spring-consumer:1.1-kafkahandler-SNAPSHOT + image: juplo/spring-consumer:1.1-messageconverter-SNAPSHOT environment: spring.kafka.bootstrap-servers: kafka:9092 spring.kafka.client-id: peter @@ -163,7 +163,7 @@ services: juplo.consumer.topic: test ute: - image: juplo/spring-consumer:1.1-kafkahandler-SNAPSHOT + image: juplo/spring-consumer:1.1-messageconverter-SNAPSHOT environment: spring.kafka.bootstrap-servers: kafka:9092 spring.kafka.client-id: ute diff --git a/pom.xml b/pom.xml index 284bb4a9..2b906347 100644 --- a/pom.xml +++ b/pom.xml @@ -15,7 +15,7 @@ spring-consumer Spring Consumer Super Simple Consumer-Group, that is implemented as Spring-Boot application and configured by Spring Kafka - 1.1-kafkahandler-SNAPSHOT + 1.1-messageconverter-SNAPSHOT 21 diff --git a/src/main/java/de/juplo/kafka/AddNumberRecordFilterStrategy.java b/src/main/java/de/juplo/kafka/AddNumberRecordFilterStrategy.java new file mode 100644 index 00000000..da5f80f9 --- /dev/null +++ b/src/main/java/de/juplo/kafka/AddNumberRecordFilterStrategy.java @@ -0,0 +1,52 @@ +package de.juplo.kafka; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.header.Header; +import org.springframework.kafka.listener.adapter.RecordFilterStrategy; +import org.springframework.stereotype.Component; + +import java.nio.charset.StandardCharsets; + +import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME; + + +@Component +@Slf4j +public class AddNumberRecordFilterStrategy implements RecordFilterStrategy +{ + public final String HEADER_NAME = DEFAULT_CLASSID_FIELD_NAME; + public final String TYPE = "ADD"; + + @Override + public boolean filter(ConsumerRecord record) + { + Header header = record.headers().lastHeader("__TypeId__"); + if (header == null) + { + log.error( + "Header {} is missing! Filtering topic={}, partition={}, offset={}", + HEADER_NAME, + record.topic(), + record.partition(), + record.offset()); + + return true; + } + + String type = new String(header.value(), StandardCharsets.UTF_8); + if (!TYPE.equals(type)) + { + log.info( + "Filtering message of type \"{}\" at: topic={}, partition={}, offset={}", + type, + record.topic(), + record.partition(), + record.offset()); + + return true; + } + + return false; + } +} diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java new file mode 100644 index 00000000..46dd5be7 --- /dev/null +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -0,0 +1,20 @@ +package de.juplo.kafka; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.support.converter.JsonMessageConverter; + + +@Configuration +public class ApplicationConfiguration +{ + // Contrary to what the documentation claims, it makes no difference + // to instantiate a ByteArrayMessageConverter, because Spring never + // calls the optimized method convertPayload(Message). + @Bean + JsonMessageConverter jsonMessageConverter(ObjectMapper objectMapper) + { + return new JsonMessageConverter(objectMapper); + } +} diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index 639d82fd..2efce13c 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -2,14 +2,12 @@ package de.juplo.kafka; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; -import org.springframework.kafka.annotation.KafkaHandler; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; @Slf4j @Component -@KafkaListener(topics = "${juplo.consumer.topic}") public class ExampleConsumer { @Value("${spring.kafka.client-id}") @@ -17,13 +15,14 @@ public class ExampleConsumer private long consumed = 0; - @KafkaHandler + @KafkaListener( + topics = "${juplo.consumer.topic}", + filter = "addNumberRecordFilterStrategy") private void addNumber(MessageAddNumber addNumber) { log.info("{} - Adding number {}", id, addNumber.getNext()); } - @KafkaHandler private void calcSum(MessageCalculateSum calculateSum) { log.info("{} - Calculating sum", id); diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index b3e33585..8ca8a1bd 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -27,9 +27,7 @@ spring: client-id: DEV consumer: group-id: my-group - value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer - properties: - "[spring.json.type.mapping]": ADD:de.juplo.kafka.MessageAddNumber,CALC:de.juplo.kafka.MessageCalculateSum + value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer level: root: INFO de.juplo: DEBUG -- 2.20.1