From c60968bad510327de4c2a2dd5b1f3b10f6a9ea30 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 14 Feb 2025 15:30:34 +0100 Subject: [PATCH] Empfangen unterschiedlicher Typen mit dem `MessageConverter` --- .../kafka/AddNumberRecordFilterStrategy.java | 52 ------------------- .../juplo/kafka/ApplicationConfiguration.java | 20 ++++++- .../java/de/juplo/kafka/ExampleConsumer.java | 5 +- 3 files changed, 23 insertions(+), 54 deletions(-) delete mode 100644 src/main/java/de/juplo/kafka/AddNumberRecordFilterStrategy.java diff --git a/src/main/java/de/juplo/kafka/AddNumberRecordFilterStrategy.java b/src/main/java/de/juplo/kafka/AddNumberRecordFilterStrategy.java deleted file mode 100644 index da5f80f9..00000000 --- a/src/main/java/de/juplo/kafka/AddNumberRecordFilterStrategy.java +++ /dev/null @@ -1,52 +0,0 @@ -package de.juplo.kafka; - -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.common.header.Header; -import org.springframework.kafka.listener.adapter.RecordFilterStrategy; -import org.springframework.stereotype.Component; - -import java.nio.charset.StandardCharsets; - -import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME; - - -@Component -@Slf4j -public class AddNumberRecordFilterStrategy implements RecordFilterStrategy -{ - public final String HEADER_NAME = DEFAULT_CLASSID_FIELD_NAME; - public final String TYPE = "ADD"; - - @Override - public boolean filter(ConsumerRecord record) - { - Header header = record.headers().lastHeader("__TypeId__"); - if (header == null) - { - log.error( - "Header {} is missing! Filtering topic={}, partition={}, offset={}", - HEADER_NAME, - record.topic(), - record.partition(), - record.offset()); - - return true; - } - - String type = new String(header.value(), StandardCharsets.UTF_8); - if (!TYPE.equals(type)) - { - log.info( - "Filtering message of type \"{}\" at: topic={}, partition={}, offset={}", - type, - record.topic(), - record.partition(), - record.offset()); - - return true; - } - - return false; - } -} diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 46dd5be7..efa13088 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -4,6 +4,12 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.support.converter.JsonMessageConverter; +import org.springframework.kafka.support.converter.StringJsonMessageConverter; +import org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper; +import org.springframework.kafka.support.mapping.Jackson2JavaTypeMapper; + +import java.util.HashMap; +import java.util.Map; @Configuration @@ -15,6 +21,18 @@ public class ApplicationConfiguration @Bean JsonMessageConverter jsonMessageConverter(ObjectMapper objectMapper) { - return new JsonMessageConverter(objectMapper); + JsonMessageConverter converter = new JsonMessageConverter(); + DefaultJackson2JavaTypeMapper typeMapper = new DefaultJackson2JavaTypeMapper(); + + Map> typeMappings = new HashMap<>(); + typeMappings.put("ADD", MessageAddNumber.class); + typeMappings.put("CALC", MessageCalculateSum.class); + + typeMapper.setTypePrecedence(Jackson2JavaTypeMapper.TypePrecedence.TYPE_ID); + typeMapper.setIdClassMapping(typeMappings); + + converter.setTypeMapper(typeMapper); + + return converter; } } diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index 7dec6be8..639d82fd 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -2,12 +2,14 @@ package de.juplo.kafka; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.annotation.KafkaHandler; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; @Slf4j @Component +@KafkaListener(topics = "${juplo.consumer.topic}") public class ExampleConsumer { @Value("${spring.kafka.client-id}") @@ -15,12 +17,13 @@ public class ExampleConsumer private long consumed = 0; - @KafkaListener(topics = "${juplo.consumer.topic}") + @KafkaHandler private void addNumber(MessageAddNumber addNumber) { log.info("{} - Adding number {}", id, addNumber.getNext()); } + @KafkaHandler private void calcSum(MessageCalculateSum calculateSum) { log.info("{} - Calculating sum", id); -- 2.20.1