#!/bin/bash
-IMAGE=juplo/spring-consumer:1.1-kafkahandler-SNAPSHOT
+IMAGE=juplo/spring-consumer:1.1-messageconverter-SNAPSHOT
if [ "$1" = "cleanup" ]
then
}
group = 'de.juplo.kafka'
-version = '1.1-kafkahandler-SNAPSHOT'
+version = '1.1-messageconverter-SNAPSHOT'
java {
toolchain {
juplo.producer.throttle-ms: 100
consumer:
- image: juplo/spring-consumer:1.1-kafkahandler-SNAPSHOT
+ image: juplo/spring-consumer:1.1-messageconverter-SNAPSHOT
environment:
spring.kafka.bootstrap-servers: kafka:9092
spring.kafka.client-id: consumer
juplo.consumer.topic: test
peter:
- image: juplo/spring-consumer:1.1-kafkahandler-SNAPSHOT
+ image: juplo/spring-consumer:1.1-messageconverter-SNAPSHOT
environment:
spring.kafka.bootstrap-servers: kafka:9092
spring.kafka.client-id: peter
juplo.consumer.topic: test
ute:
- image: juplo/spring-consumer:1.1-kafkahandler-SNAPSHOT
+ image: juplo/spring-consumer:1.1-messageconverter-SNAPSHOT
environment:
spring.kafka.bootstrap-servers: kafka:9092
spring.kafka.client-id: ute
<artifactId>spring-consumer</artifactId>
<name>Spring Consumer</name>
<description>Super Simple Consumer-Group, that is implemented as Spring-Boot application and configured by Spring Kafka</description>
- <version>1.1-kafkahandler-SNAPSHOT</version>
+ <version>1.1-messageconverter-SNAPSHOT</version>
<properties>
<java.version>21</java.version>
--- /dev/null
+package de.juplo.kafka;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.header.Header;
+import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
+import org.springframework.stereotype.Component;
+
+import java.nio.charset.StandardCharsets;
+
+import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME;
+
+
+@Component
+@Slf4j
+public class AddNumberRecordFilterStrategy implements RecordFilterStrategy
+{
+ public final String HEADER_NAME = DEFAULT_CLASSID_FIELD_NAME;
+ public final String TYPE = "ADD";
+
+ @Override
+ public boolean filter(ConsumerRecord record)
+ {
+ Header header = record.headers().lastHeader("__TypeId__");
+ if (header == null)
+ {
+ log.error(
+ "Header {} is missing! Filtering topic={}, partition={}, offset={}",
+ HEADER_NAME,
+ record.topic(),
+ record.partition(),
+ record.offset());
+
+ return true;
+ }
+
+ String type = new String(header.value(), StandardCharsets.UTF_8);
+ if (!TYPE.equals(type))
+ {
+ log.info(
+ "Filtering message of type \"{}\" at: topic={}, partition={}, offset={}",
+ type,
+ record.topic(),
+ record.partition(),
+ record.offset());
+
+ return true;
+ }
+
+ return false;
+ }
+}
--- /dev/null
+package de.juplo.kafka;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.support.converter.JsonMessageConverter;
+
+
+@Configuration
+public class ApplicationConfiguration
+{
+ // Contrary to what the documentation claims, it makes no difference
+ // to instantiate a ByteArrayMessageConverter, because Spring never
+ // calls the optimized method convertPayload(Message).
+ @Bean
+ JsonMessageConverter jsonMessageConverter(ObjectMapper objectMapper)
+ {
+ return new JsonMessageConverter(objectMapper);
+ }
+}
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
-import org.springframework.kafka.annotation.KafkaHandler;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
-@KafkaListener(topics = "${juplo.consumer.topic}")
public class ExampleConsumer
{
@Value("${spring.kafka.client-id}")
private long consumed = 0;
- @KafkaHandler
+ @KafkaListener(
+ topics = "${juplo.consumer.topic}",
+ filter = "addNumberRecordFilterStrategy")
private void addNumber(MessageAddNumber addNumber)
{
log.info("{} - Adding number {}", id, addNumber.getNext());
}
- @KafkaHandler
private void calcSum(MessageCalculateSum calculateSum)
{
log.info("{} - Calculating sum", id);
client-id: DEV
consumer:
group-id: my-group
- value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
- properties:
- "[spring.json.type.mapping]": ADD:de.juplo.kafka.MessageAddNumber,CALC:de.juplo.kafka.MessageCalculateSum
+ value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
level:
root: INFO
de.juplo: DEBUG