+++ /dev/null
-package de.juplo.kafka;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.header.Header;
-import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
-import org.springframework.stereotype.Component;
-
-import java.nio.charset.StandardCharsets;
-
-import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME;
-
-
-@Component
-@Slf4j
-public class AddNumberRecordFilterStrategy implements RecordFilterStrategy
-{
- public final String HEADER_NAME = DEFAULT_CLASSID_FIELD_NAME;
- public final String TYPE = "ADD";
-
- @Override
- public boolean filter(ConsumerRecord record)
- {
- Header header = record.headers().lastHeader("__TypeId__");
- if (header == null)
- {
- log.error(
- "Header {} is missing! Filtering topic={}, partition={}, offset={}",
- HEADER_NAME,
- record.topic(),
- record.partition(),
- record.offset());
-
- return true;
- }
-
- String type = new String(header.value(), StandardCharsets.UTF_8);
- if (!TYPE.equals(type))
- {
- log.info(
- "Filtering message of type \"{}\" at: topic={}, partition={}, offset={}",
- type,
- record.topic(),
- record.partition(),
- record.offset());
-
- return true;
- }
-
- return false;
- }
-}
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.support.converter.JsonMessageConverter;
+import org.springframework.kafka.support.converter.StringJsonMessageConverter;
+import org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper;
+import org.springframework.kafka.support.mapping.Jackson2JavaTypeMapper;
+
+import java.util.HashMap;
+import java.util.Map;
@Configuration
@Bean
JsonMessageConverter jsonMessageConverter(ObjectMapper objectMapper)
{
- return new JsonMessageConverter(objectMapper);
+ JsonMessageConverter converter = new JsonMessageConverter();
+ DefaultJackson2JavaTypeMapper typeMapper = new DefaultJackson2JavaTypeMapper();
+
+ Map<String, Class<?>> typeMappings = new HashMap<>();
+ typeMappings.put("ADD", MessageAddNumber.class);
+ typeMappings.put("CALC", MessageCalculateSum.class);
+
+ typeMapper.setTypePrecedence(Jackson2JavaTypeMapper.TypePrecedence.TYPE_ID);
+ typeMapper.setIdClassMapping(typeMappings);
+
+ converter.setTypeMapper(typeMapper);
+
+ return converter;
}
}
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
+import org.springframework.kafka.annotation.KafkaHandler;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
+@KafkaListener(topics = "${juplo.consumer.topic}")
public class ExampleConsumer
{
@Value("${spring.kafka.client-id}")
private long consumed = 0;
- @KafkaListener(topics = "${juplo.consumer.topic}")
+ @KafkaHandler
private void addNumber(MessageAddNumber addNumber)
{
log.info("{} - Adding number {}", id, addNumber.getNext());
}
+ @KafkaHandler
private void calcSum(MessageCalculateSum calculateSum)
{
log.info("{} - Calculating sum", id);