#!/bin/bash
-
-IMAGE=juplo/spring-producer:2.0-kafkatemplate-SNAPSHOT
+IMAGE=juplo/spring-producer:2.0-kafkatemplate-messageconverter-SNAPSHOT
if [ "$1" = "cleanup" ]
then
}
group = 'de.juplo.kafka'
-version = '2.0-kafkatemplate-SNAPSHOT'
+version = '2.0-kafkatemplate-messageconverter-SNAPSHOT'
java {
toolchain {
- kafka-3
producer:
- image: juplo/spring-producer:2.0-kafkatemplate-SNAPSHOT
+ image: juplo/spring-producer:2.0-kafkatemplate-messageconverter-SNAPSHOT
environment:
spring.kafka.bootstrap-servers: kafka:9092
spring.kafka.client-id: producer
<artifactId>spring-producer</artifactId>
<name>Spring Producer</name>
<description>A Simple Producer, based on the KafkaTemplate and Spring Boot, that sends messages via Kafka</description>
- <version>2.0-kafkatemplate-SNAPSHOT</version>
+ <version>2.0-kafkatemplate-messageconverter-SNAPSHOT</version>
<properties>
<java.version>21</java.version>
--- /dev/null
+package de.juplo.kafka;
+
+import lombok.Value;
+
+
+@Value
+public class AddNumberMessage implements SumupMessage
+{
+ private final int number;
+ private final int next;
+}
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.core.ProducerFactory;
+import org.springframework.kafka.support.converter.JacksonJsonMessageConverter;
+import org.springframework.kafka.support.converter.StringJacksonJsonMessageConverter;
+import org.springframework.kafka.support.mapping.DefaultJacksonJavaTypeMapper;
+import org.springframework.kafka.support.mapping.JacksonJavaTypeMapper;
import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
@Configuration
public ExampleProducer exampleProducer(
@Value("${spring.kafka.client-id}") String clientId,
ApplicationProperties properties,
- KafkaTemplate<String, String> kafkaTemplate,
+ KafkaTemplate<String, SumupMessage> kafkaTemplate,
ConfigurableApplicationContext applicationContext)
{
return
: properties.getProducerProperties().getThrottle(),
kafkaTemplate,
() -> applicationContext.close());
+
+ }
+
+ @Bean
+ public KafkaTemplate<String, SumupMessage> kafkaTemplate(
+ ProducerFactory<String, SumupMessage> producerFactory,
+ JacksonJsonMessageConverter jacksonJsonMessageConverter) {
+
+ KafkaTemplate<String, SumupMessage> template = new KafkaTemplate<>(producerFactory);
+ template.setMessageConverter(jacksonJsonMessageConverter);
+
+ return template;
+ }
+
+ @Bean
+ public StringJacksonJsonMessageConverter jacksonJsonMessageConverter()
+ {
+ StringJacksonJsonMessageConverter converter = new StringJacksonJsonMessageConverter();
+ DefaultJacksonJavaTypeMapper typeMapper = new DefaultJacksonJavaTypeMapper();
+
+ // Verwende eine einfache, kurze Type-ID anstatt FQN
+ typeMapper.setTypePrecedence(JacksonJavaTypeMapper.TypePrecedence.TYPE_ID);
+ Map<String, Class<?>> typeMappings = new HashMap<>();
+ typeMappings.put("ADD", AddNumberMessage.class);
+ typeMappings.put("CALC", CalculateSumMessage.class);
+ typeMapper.setIdClassMapping(typeMappings);
+
+ converter.setTypeMapper(typeMapper);
+
+ return converter;
}
}
--- /dev/null
+package de.juplo.kafka;
+
+
+import lombok.Value;
+
+
+@Value
+public class CalculateSumMessage implements SumupMessage
+{
+ private final int number;
+}
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.KafkaHeaders;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.support.MessageBuilder;
import java.time.Duration;
private final String id;
private final String topic;
private final Duration throttle;
- private final KafkaTemplate<String, String> kafkaTemplate;
+ private final KafkaTemplate<String, SumupMessage> kafkaTemplate;
private final Thread workerThread;
private final Runnable closeCallback;
String id,
String topic,
Duration throttle,
- KafkaTemplate<String, String> kafkaTemplate,
+ KafkaTemplate<String, SumupMessage> kafkaTemplate,
Runnable closeCallback)
{
this.id = id;
{
for (; running; i++)
{
- send(Long.toString(i%10), Long.toString(i));
+ int number = (int) i % 10;
+ SumupMessage message = (i % 7 == 0)
+ ? new CalculateSumMessage(number)
+ : new AddNumberMessage(number, (int)i);
+
+ send(Long.toString(number), message);
if (throttle.isPositive())
{
}
}
- void send(String key, String value)
+ void send(String key, SumupMessage value)
{
final long sendRequested = System.currentTimeMillis();
- kafkaTemplate.send(topic, key, value).whenComplete((result, e) ->
+ Message<SumupMessage> message = MessageBuilder
+ .withPayload(value)
+ .setHeader(KafkaHeaders.TOPIC, topic)
+ .setHeader(KafkaHeaders.KEY, key)
+ .build();
+
+ kafkaTemplate.send(message).whenComplete((result, e) ->
{
long sendRequestProcessed = System.currentTimeMillis();
if (e == null)
--- /dev/null
+package de.juplo.kafka;
+
+public interface SumupMessage
+{
+}