#!/bin/bash
-IMAGE=juplo/spring-producer:1.0-json-SNAPSHOT
+IMAGE=juplo/spring-producer:1.0-json-messages-SNAPSHOT
if [ "$1" = "cleanup" ]
then
}
group = 'de.juplo.kafka'
-version = '1.0-json-SNAPSHOT'
+version = '1.0-json-messages-SNAPSHOT'
java {
toolchain {
repositories {
mavenCentral()
+ mavenLocal()
}
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-validation'
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.kafka:spring-kafka'
+ implementation 'de.juplo.messages:sumup-messages:1.0-SNAPSHOT'
compileOnly 'org.projectlombok:lombok'
developmentOnly 'org.springframework.boot:spring-boot-devtools'
annotationProcessor 'org.springframework.boot:spring-boot-configuration-processor'
- kafka-3
producer:
- image: juplo/spring-producer:1.0-json-SNAPSHOT
+ image: juplo/spring-producer:1.0-json-messages-SNAPSHOT
environment:
spring.kafka.bootstrap-servers: kafka:9092
spring.kafka.client-id: producer
<artifactId>spring-producer</artifactId>
<name>Spring Producer</name>
<description>A Simple Producer, based on Spring Boot, that sends messages via Kafka</description>
- <version>1.0-json-SNAPSHOT</version>
+ <version>1.0-json-messages-SNAPSHOT</version>
<properties>
<java.version>21</java.version>
+ <sumup-messages.version>1.0-SNAPSHOT</sumup-messages.version>
</properties>
<dependencies>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
+ <dependency>
+ <groupId>de.juplo.messages</groupId>
+ <artifactId>sumup-messages</artifactId>
+ <version>${sumup-messages.version}</version>
+ </dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
- <scope>compile</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
+++ /dev/null
-package de.juplo.kafka;
-
-import lombok.Value;
-
-
-@Value
-public class AddNumberMessage implements SumupMessage
-{
- private final int number;
- private final int next;
-}
package de.juplo.kafka;
+import de.juplo.messages.Message;
import org.apache.kafka.clients.producer.Producer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
public ExampleProducer exampleProducer(
@Value("${spring.kafka.client-id}") String clientId,
ApplicationProperties properties,
- Producer<String, SumupMessage> kafkaProducer,
+ Producer<String, Message> kafkaProducer,
ConfigurableApplicationContext applicationContext)
{
return
+++ /dev/null
-package de.juplo.kafka;
-
-
-import lombok.Value;
-
-
-@Value
-public class CalculateSumMessage implements SumupMessage
-{
- private final int number;
-}
package de.juplo.kafka;
+import de.juplo.messages.Add;
+import de.juplo.messages.Calculate;
+import de.juplo.messages.Message;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
private final String id;
private final String topic;
private final Duration throttle;
- private final Producer<String, SumupMessage> producer;
+ private final Producer<String, Message> producer;
private final Thread workerThread;
private final Runnable closeCallback;
String id,
String topic,
Duration throttle,
- Producer<String, SumupMessage> producer,
+ Producer<String, Message> producer,
Runnable closeCallback)
{
this.id = id;
for (; running; i++)
{
int number = i % 10;
- SumupMessage message = (i % 7 == 0)
- ? new CalculateSumMessage(number)
- : new AddNumberMessage(number, i);
+ Message message = (i % 7 == 0)
+ ? Calculate.builder().number(number).build()
+ : Add.builder().number(number).next(i).build();
send(Long.toString(number), message);
}
}
- void send(String key, SumupMessage value)
+ void send(String key, Message value)
{
final long time = System.currentTimeMillis();
- final ProducerRecord<String, SumupMessage> record = new ProducerRecord<>(
+ final ProducerRecord<String, Message> record = new ProducerRecord<>(
topic, // Topic
key, // Key
value // Value
+++ /dev/null
-package de.juplo.kafka;
-
-public interface SumupMessage
-{
-}
compression-type: gzip
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
properties:
- spring.json.type.mapping: >-
- ADD:de.juplo.kafka.AddNumberMessage,
- CALC:de.juplo.kafka.CalculateSumMessage
delivery-timeout: 10s
max-block: 5s
linger: 0