#!/bin/bash
-IMAGE=juplo/spring-producer:1.0-SNAPSHOT
+IMAGE=juplo/spring-producer:1.0-json-SNAPSHOT
if [ "$1" = "cleanup" ]
then
- kafka-3
producer:
- image: juplo/spring-producer:1.0-SNAPSHOT
+ image: juplo/spring-producer:1.0-json-SNAPSHOT
environment:
juplo.bootstrap-server: kafka:9092
juplo.client-id: producer
juplo.producer.topic: test
consumer-1:
- image: juplo/simple-consumer:1.0-SNAPSHOT
- command: kafka:9092 test my-group consumer-1
+ image: juplo/spring-consumer:1.1-json-SNAPSHOT
+ environment:
+ juplo.bootstrap-server: kafka:9092
+ juplo.client-id: consumer-1
+ juplo.consumer.topic: test
consumer-2:
- image: juplo/simple-consumer:1.0-SNAPSHOT
- command: kafka:9092 test my-group consumer-2
+ image: juplo/spring-consumer:1.1-json-SNAPSHOT
+ environment:
+ juplo.bootstrap-server: kafka:9092
+ juplo.client-id: consumer-2
+ juplo.consumer.topic: test
volumes:
zookeeper-data:
<artifactId>spring-producer</artifactId>
<name>Spring Producer</name>
<description>A Simple Spring-Boot-Producer, that takes messages via POST and confirms successs</description>
- <version>1.0-SNAPSHOT</version>
+ <version>1.0-json-SNAPSHOT</version>
<properties>
<java.version>21</java.version>
<artifactId>spring-boot-starter-validation</artifactId>
</dependency>
<dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
+ <groupId>org.springframework.kafka</groupId>
+ <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>org.springframework.kafka</groupId>
- <artifactId>spring-kafka</artifactId>
- <scope>test</scope>
- </dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
--- /dev/null
+package de.juplo.kafka;
+
+import lombok.Value;
+
+
+@Value
+public class AddNumberMessage implements SumupMessage
+{
+ private final int number;
+ private final int next;
+}
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.support.serializer.JsonSerializer;
import java.time.Duration;
import java.util.Properties;
@Bean
public ExampleProducer exampleProducer(
ApplicationProperties properties,
- Producer<String, String> kafkaProducer,
+ Producer<String, SumupMessage> kafkaProducer,
ConfigurableApplicationContext applicationContext)
{
return
}
@Bean(destroyMethod = "")
- public KafkaProducer<String, String> kafkaProducer(ApplicationProperties properties)
+ public KafkaProducer<String, SumupMessage> kafkaProducer(ApplicationProperties properties)
{
Properties props = new Properties();
props.put("bootstrap.servers", properties.getBootstrapServer());
props.put("linger.ms", properties.getProducerProperties().getLinger().toMillis());
props.put("compression.type", properties.getProducerProperties().getCompressionType());
props.put("key.serializer", StringSerializer.class.getName());
- props.put("value.serializer", StringSerializer.class.getName());
+ props.put("value.serializer", JsonSerializer.class.getName());
+ props.put("spring.json.type.mapping", "ADD:de.juplo.kafka.AddNumberMessage,CALC:de.juplo.kafka.CalculateSumMessage");
return new KafkaProducer<>(props);
}
--- /dev/null
+package de.juplo.kafka;
+
+
+import lombok.Value;
+
+
+@Value
+public class CalculateSumMessage implements SumupMessage
+{
+ private final int number;
+}
private final String id;
private final String topic;
private final Duration throttle;
- private final Producer<String, String> producer;
+ private final Producer<String, SumupMessage> producer;
private final Thread workerThread;
private final Runnable closeCallback;
String id,
String topic,
Duration throttle,
- Producer<String, String> producer,
+ Producer<String, SumupMessage> producer,
Runnable closeCallback)
{
this.id = id;
@Override
public void run()
{
- long i = 0;
+ int i = 0;
try
{
for (; running; i++)
{
- send(Long.toString(i%10), Long.toString(i));
+ int number = i % 10;
+ SumupMessage message = (i % 7 == 0)
+ ? new CalculateSumMessage(number)
+ : new AddNumberMessage(number, i);
+
+ send(Long.toString(number), message);
if (throttle.isPositive())
{
}
}
- void send(String key, String value)
+ void send(String key, SumupMessage value)
{
final long time = System.currentTimeMillis();
- final ProducerRecord<String, String> record = new ProducerRecord<>(
+ final ProducerRecord<String, SumupMessage> record = new ProducerRecord<>(
topic, // Topic
key, // Key
value // Value
--- /dev/null
+package de.juplo.kafka;
+
+public interface SumupMessage
+{
+}