implementation 'org.springframework.boot:spring-boot-starter-actuator'
implementation 'org.springframework.boot:spring-boot-starter-validation'
implementation 'org.springframework.boot:spring-boot-starter-web'
+ implementation 'de.juplo.messages:sumup-messages:1.0-SNAPSHOT'
compileOnly 'org.projectlombok:lombok'
developmentOnly 'org.springframework.boot:spring-boot-devtools'
annotationProcessor 'org.springframework.boot:spring-boot-configuration-processor'
<properties>
<java.version>21</java.version>
+ <sumup-messages.version>1.0-SNAPSHOT</sumup-messages.version>
</properties>
<dependencies>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-kafka</artifactId>
</dependency>
+ <dependency>
+ <groupId>de.juplo.messages</groupId>
+ <artifactId>sumup-messages</artifactId>
+ <version>${sumup-messages.version}</version>
+ </dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
+++ /dev/null
-package de.juplo.kafka;
-
-import lombok.Value;
-
-
-@Value
-public class AddNumberMessage implements SumupMessage
-{
- private final int number;
- private final int next;
-}
package de.juplo.kafka;
+import de.juplo.messages.Add;
+import de.juplo.messages.Calculate;
+import de.juplo.messages.Message;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.ConfigurableApplicationContext;
public ExampleProducer exampleProducer(
@Value("${spring.kafka.client-id}") String clientId,
ApplicationProperties properties,
- KafkaTemplate<String, SumupMessage> kafkaTemplate,
+ KafkaTemplate<String, Message> kafkaTemplate,
ConfigurableApplicationContext applicationContext)
{
return
}
@Bean
- public KafkaTemplate<String, SumupMessage> kafkaTemplate(
- ProducerFactory<String, SumupMessage> producerFactory,
+ public KafkaTemplate<String, Message> kafkaTemplate(
+ ProducerFactory<String, Message> producerFactory,
JacksonJsonMessageConverter jacksonJsonMessageConverter) {
- KafkaTemplate<String, SumupMessage> template = new KafkaTemplate<>(producerFactory);
+ KafkaTemplate<String, Message> template = new KafkaTemplate<>(producerFactory);
template.setMessageConverter(jacksonJsonMessageConverter);
return template;
// Verwende eine einfache, kurze Type-ID anstatt FQN
typeMapper.setTypePrecedence(JacksonJavaTypeMapper.TypePrecedence.TYPE_ID);
Map<String, Class<?>> typeMappings = new HashMap<>();
- typeMappings.put("ADD", AddNumberMessage.class);
- typeMappings.put("CALC", CalculateSumMessage.class);
+ typeMappings.put("ADD", Add.class);
+ typeMappings.put("CALC", Calculate.class);
typeMapper.setIdClassMapping(typeMappings);
converter.setTypeMapper(typeMapper);
+++ /dev/null
-package de.juplo.kafka;
-
-
-import lombok.Value;
-
-
-@Value
-public class CalculateSumMessage implements SumupMessage
-{
- private final int number;
-}
package de.juplo.kafka;
+import de.juplo.messages.*;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
-import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import java.time.Duration;
private final String id;
private final String topic;
private final Duration throttle;
- private final KafkaTemplate<String, SumupMessage> kafkaTemplate;
+ private final KafkaTemplate<String, Message> kafkaTemplate;
private final Thread workerThread;
private final Runnable closeCallback;
String id,
String topic,
Duration throttle,
- KafkaTemplate<String, SumupMessage> kafkaTemplate,
+ KafkaTemplate<String, Message> kafkaTemplate,
Runnable closeCallback)
{
this.id = id;
for (; running; i++)
{
int number = (int) i % 10;
- SumupMessage message = (i % 7 == 0)
- ? new CalculateSumMessage(number)
- : new AddNumberMessage(number, (int)i);
+ Message message = (i % 7 == 0)
+ ? Calculate.builder().number(number).build()
+ : Add.builder().number(number).next((int)i).build();
send(Long.toString(number), message);
}
}
- void send(String key, SumupMessage value)
+ void send(String key, de.juplo.messages.Message value)
{
final long sendRequested = System.currentTimeMillis();
- Message<SumupMessage> message = MessageBuilder
+ org.springframework.messaging.Message<Message> message = MessageBuilder
.withPayload(value)
.setHeader(KafkaHeaders.TOPIC, topic)
.setHeader(KafkaHeaders.KEY, key)
+++ /dev/null
-package de.juplo.kafka;
-
-public interface SumupMessage
-{
-}