From 1a8e3028e60b0790d15320bbb5e60575278eb43f Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Tue, 11 Feb 2025 22:35:27 +0100 Subject: [PATCH] Umbau auf `MessageConverter` (Nur eine Methode, zweite wird gefiltert) --- README.sh | 2 +- build.gradle | 2 +- docker/docker-compose.yml | 9 ++-- pom.xml | 2 +- .../kafka/AddNumberRecordFilterStrategy.java | 52 +++++++++++++++++++ .../juplo/kafka/ApplicationConfiguration.java | 16 ++++++ .../java/de/juplo/kafka/ExampleConsumer.java | 7 ++- src/main/resources/application.yml | 4 +- 8 files changed, 79 insertions(+), 15 deletions(-) create mode 100644 src/main/java/de/juplo/kafka/AddNumberRecordFilterStrategy.java create mode 100644 src/main/java/de/juplo/kafka/ApplicationConfiguration.java diff --git a/README.sh b/README.sh index ca477732..bdeb8b22 100755 --- a/README.sh +++ b/README.sh @@ -1,6 +1,6 @@ #!/bin/bash -IMAGE=juplo/spring-consumer:1.1-kafkahandler-SNAPSHOT +IMAGE=juplo/spring-consumer:1.1-messageconverter-SNAPSHOT if [ "$1" = "cleanup" ] then diff --git a/build.gradle b/build.gradle index c228f3b5..ad9289e8 100644 --- a/build.gradle +++ b/build.gradle @@ -6,7 +6,7 @@ plugins { } group = 'de.juplo.kafka' -version = '1.1-kafkahandler-SNAPSHOT' +version = '1.1-messageconverter-SNAPSHOT' java { toolchain { diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 30af6acc..196fb7fd 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -173,17 +173,16 @@ services: - kafka-3 producer: - image: juplo/spring-producer:2.0-json-SNAPSHOT + image: juplo/spring-producer:2.0-messageconverter-SNAPSHOT environment: spring.kafka.bootstrap-servers: kafka:9092 spring.kafka.client-id: producer juplo.producer.topic: test cpu_period: 100000 cpu_quota: 50000 - mem_limit: 100m consumer: - image: juplo/spring-consumer:1.1-kafkahandler-SNAPSHOT + image: juplo/spring-consumer:1.1-messageconverter-SNAPSHOT environment: spring.kafka.bootstrap-servers: kafka:9092 spring.kafka.client-id: consumer @@ -192,7 +191,7 @@ services: juplo.consumer.topic: test peter: - image: juplo/spring-consumer:1.1-kafkahandler-SNAPSHOT + image: juplo/spring-consumer:1.1-messageconverter-SNAPSHOT environment: spring.kafka.bootstrap-servers: kafka:9092 spring.kafka.client-id: peter @@ -201,7 +200,7 @@ services: juplo.consumer.topic: test ute: - image: juplo/spring-consumer:1.1-kafkahandler-SNAPSHOT + image: juplo/spring-consumer:1.1-messageconverter-SNAPSHOT environment: spring.kafka.bootstrap-servers: kafka:9092 spring.kafka.client-id: ute diff --git a/pom.xml b/pom.xml index 6eb47f0c..851b1c92 100644 --- a/pom.xml +++ b/pom.xml @@ -15,7 +15,7 @@ spring-consumer Spring Consumer Super Simple Consumer-Group, that is implemented as Spring-Boot application and configured by Spring Kafka - 1.1-kafkahandler-SNAPSHOT + 1.1-messageconverter-SNAPSHOT 21 diff --git a/src/main/java/de/juplo/kafka/AddNumberRecordFilterStrategy.java b/src/main/java/de/juplo/kafka/AddNumberRecordFilterStrategy.java new file mode 100644 index 00000000..da5f80f9 --- /dev/null +++ b/src/main/java/de/juplo/kafka/AddNumberRecordFilterStrategy.java @@ -0,0 +1,52 @@ +package de.juplo.kafka; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.header.Header; +import org.springframework.kafka.listener.adapter.RecordFilterStrategy; +import org.springframework.stereotype.Component; + +import java.nio.charset.StandardCharsets; + +import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME; + + +@Component +@Slf4j +public class AddNumberRecordFilterStrategy implements RecordFilterStrategy +{ + public final String HEADER_NAME = DEFAULT_CLASSID_FIELD_NAME; + public final String TYPE = "ADD"; + + @Override + public boolean filter(ConsumerRecord record) + { + Header header = record.headers().lastHeader("__TypeId__"); + if (header == null) + { + log.error( + "Header {} is missing! Filtering topic={}, partition={}, offset={}", + HEADER_NAME, + record.topic(), + record.partition(), + record.offset()); + + return true; + } + + String type = new String(header.value(), StandardCharsets.UTF_8); + if (!TYPE.equals(type)) + { + log.info( + "Filtering message of type \"{}\" at: topic={}, partition={}, offset={}", + type, + record.topic(), + record.partition(), + record.offset()); + + return true; + } + + return false; + } +} diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java new file mode 100644 index 00000000..e6f8f9ba --- /dev/null +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -0,0 +1,16 @@ +package de.juplo.kafka; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.support.converter.JacksonJsonMessageConverter; + + +@Configuration +public class ApplicationConfiguration +{ + @Bean + JacksonJsonMessageConverter jsonMessageConverter() + { + return new JacksonJsonMessageConverter(); + } +} diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index 639d82fd..2efce13c 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -2,14 +2,12 @@ package de.juplo.kafka; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; -import org.springframework.kafka.annotation.KafkaHandler; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; @Slf4j @Component -@KafkaListener(topics = "${juplo.consumer.topic}") public class ExampleConsumer { @Value("${spring.kafka.client-id}") @@ -17,13 +15,14 @@ public class ExampleConsumer private long consumed = 0; - @KafkaHandler + @KafkaListener( + topics = "${juplo.consumer.topic}", + filter = "addNumberRecordFilterStrategy") private void addNumber(MessageAddNumber addNumber) { log.info("{} - Adding number {}", id, addNumber.getNext()); } - @KafkaHandler private void calcSum(MessageCalculateSum calculateSum) { log.info("{} - Calculating sum", id); diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 8c09c9c1..8ca8a1bd 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -27,9 +27,7 @@ spring: client-id: DEV consumer: group-id: my-group - value-deserializer: org.springframework.kafka.support.serializer.JacksonJsonDeserializer - properties: - "[spring.json.type.mapping]": ADD:de.juplo.kafka.MessageAddNumber,CALC:de.juplo.kafka.MessageCalculateSum + value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer level: root: INFO de.juplo: DEBUG -- 2.39.5