Empfangen unterschiedlicher Typen mit dem `MessageConverter` spring/spring-consumer--messageconverter spring/spring-consumer--messageconverter--2025-03-signal spring/spring-consumer--messageconverter--2025-04-signal
authorKai Moritz <kai@juplo.de>
Fri, 14 Feb 2025 14:30:34 +0000 (15:30 +0100)
committerKai Moritz <kai@juplo.de>
Tue, 25 Mar 2025 19:05:05 +0000 (20:05 +0100)
src/main/java/de/juplo/kafka/AddNumberRecordFilterStrategy.java [deleted file]
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/ExampleConsumer.java

diff --git a/src/main/java/de/juplo/kafka/AddNumberRecordFilterStrategy.java b/src/main/java/de/juplo/kafka/AddNumberRecordFilterStrategy.java
deleted file mode 100644 (file)
index da5f80f..0000000
+++ /dev/null
@@ -1,52 +0,0 @@
-package de.juplo.kafka;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.header.Header;
-import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
-import org.springframework.stereotype.Component;
-
-import java.nio.charset.StandardCharsets;
-
-import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME;
-
-
-@Component
-@Slf4j
-public class AddNumberRecordFilterStrategy implements RecordFilterStrategy
-{
-  public final String HEADER_NAME = DEFAULT_CLASSID_FIELD_NAME;
-  public final String TYPE = "ADD";
-
-  @Override
-  public boolean filter(ConsumerRecord record)
-  {
-    Header header = record.headers().lastHeader("__TypeId__");
-    if (header == null)
-    {
-      log.error(
-        "Header {} is missing! Filtering topic={}, partition={}, offset={}",
-        HEADER_NAME,
-        record.topic(),
-        record.partition(),
-        record.offset());
-
-      return true;
-    }
-
-    String type = new String(header.value(), StandardCharsets.UTF_8);
-    if (!TYPE.equals(type))
-    {
-      log.info(
-        "Filtering message of type \"{}\" at: topic={}, partition={}, offset={}",
-        type,
-        record.topic(),
-        record.partition(),
-        record.offset());
-
-      return true;
-    }
-
-    return false;
-  }
-}
index 46dd5be..efa1308 100644 (file)
@@ -4,6 +4,12 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.kafka.support.converter.JsonMessageConverter;
+import org.springframework.kafka.support.converter.StringJsonMessageConverter;
+import org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper;
+import org.springframework.kafka.support.mapping.Jackson2JavaTypeMapper;
+
+import java.util.HashMap;
+import java.util.Map;
 
 
 @Configuration
@@ -15,6 +21,18 @@ public class ApplicationConfiguration
   @Bean
   JsonMessageConverter jsonMessageConverter(ObjectMapper objectMapper)
   {
-    return new JsonMessageConverter(objectMapper);
+    JsonMessageConverter converter = new JsonMessageConverter();
+    DefaultJackson2JavaTypeMapper typeMapper = new DefaultJackson2JavaTypeMapper();
+
+    Map<String, Class<?>> typeMappings = new HashMap<>();
+    typeMappings.put("ADD", MessageAddNumber.class);
+    typeMappings.put("CALC", MessageCalculateSum.class);
+
+    typeMapper.setTypePrecedence(Jackson2JavaTypeMapper.TypePrecedence.TYPE_ID);
+    typeMapper.setIdClassMapping(typeMappings);
+
+    converter.setTypeMapper(typeMapper);
+
+    return converter;
   }
 }
index 7dec6be..639d82f 100644 (file)
@@ -2,12 +2,14 @@ package de.juplo.kafka;
 
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Value;
+import org.springframework.kafka.annotation.KafkaHandler;
 import org.springframework.kafka.annotation.KafkaListener;
 import org.springframework.stereotype.Component;
 
 
 @Slf4j
 @Component
+@KafkaListener(topics = "${juplo.consumer.topic}")
 public class ExampleConsumer
 {
   @Value("${spring.kafka.client-id}")
@@ -15,12 +17,13 @@ public class ExampleConsumer
   private long consumed = 0;
 
 
-  @KafkaListener(topics = "${juplo.consumer.topic}")
+  @KafkaHandler
   private void addNumber(MessageAddNumber addNumber)
   {
     log.info("{} - Adding number {}", id, addNumber.getNext());
   }
 
+  @KafkaHandler
   private void calcSum(MessageCalculateSum calculateSum)
   {
     log.info("{} - Calculating sum", id);