sumup-requests verschickt JSON-Nachrichten mit Hilfe von Spring Kafka
authorKai Moritz <kai@juplo.de>
Sat, 3 Sep 2022 11:58:41 +0000 (13:58 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 3 Sep 2022 17:10:25 +0000 (19:10 +0200)
pom.xml
src/main/java/de/juplo/kafka/AddNumberMessage.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/ApplicationRecordHandler.java
src/main/java/de/juplo/kafka/CalculateSumMessage.java [new file with mode: 0644]

diff --git a/pom.xml b/pom.xml
index 0c24f8b..0f5c319 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -36,8 +36,8 @@
       <optional>true</optional>
     </dependency>
     <dependency>
-      <groupId>org.apache.kafka</groupId>
-      <artifactId>kafka-clients</artifactId>
+      <groupId>org.springframework.kafka</groupId>
+      <artifactId>spring-kafka</artifactId>
     </dependency>
     <dependency>
       <groupId>org.projectlombok</groupId>
diff --git a/src/main/java/de/juplo/kafka/AddNumberMessage.java b/src/main/java/de/juplo/kafka/AddNumberMessage.java
new file mode 100644 (file)
index 0000000..88b5d6f
--- /dev/null
@@ -0,0 +1,11 @@
+package de.juplo.kafka;
+
+import lombok.Value;
+
+
+@Value
+public class AddNumberMessage
+{
+  private final int number;
+  private final int next;
+}
index 60f45a9..033d0cc 100644 (file)
@@ -8,7 +8,7 @@ import org.apache.kafka.common.serialization.StringSerializer;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
-
+import org.springframework.kafka.support.serializer.JsonSerializer;
 import java.util.Properties;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -20,7 +20,7 @@ public class ApplicationConfiguration
 {
   @Bean
   public ApplicationRecordHandler recordHandler(
-      KafkaProducer<String, String> kafkaProducer,
+      KafkaProducer<String, Object> kafkaProducer,
       ApplicationProperties properties)
   {
     return new ApplicationRecordHandler(
@@ -70,7 +70,7 @@ public class ApplicationConfiguration
   }
 
   @Bean(destroyMethod = "close")
-  public KafkaProducer<String, String> kafkaProducer(ApplicationProperties properties)
+  public KafkaProducer<String, Object> kafkaProducer(ApplicationProperties properties)
   {
     Properties props = new Properties();
     props.put("bootstrap.servers", properties.getBootstrapServer());
@@ -82,7 +82,11 @@ public class ApplicationConfiguration
     props.put("linger.ms", properties.getLingerMs());
     props.put("compression.type", properties.getCompressionType());
     props.put("key.serializer", StringSerializer.class.getName());
-    props.put("value.serializer", StringSerializer.class.getName());
+    props.put("value.serializer", JsonSerializer.class.getName());
+    props.put(JsonSerializer.TYPE_MAPPINGS,
+        "ADD:" + AddNumberMessage.class.getName() + "," +
+        "CALC:" + CalculateSumMessage.class.getName());
+
 
     return new KafkaProducer<>(props);
   }
index eae009c..8431a53 100644 (file)
@@ -11,7 +11,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 @Slf4j
 public class ApplicationRecordHandler implements RecordHandler<String, Integer>
 {
-  private final Producer<String, String> producer;
+  private final Producer<String, Object> producer;
   private final String id;
   private final String topic;
 
@@ -24,16 +24,16 @@ public class ApplicationRecordHandler implements RecordHandler<String, Integer>
 
     for (int i = 1; i <= number; i++)
     {
-      send(key, Integer.toString(i));
+      send(key, new AddNumberMessage(number, i));
     }
-    send(key, "CALCULATE");
+    send(key, new CalculateSumMessage(number));
   }
 
-  private void send(String key, String value)
+  private void send(String key, Object value)
   {
       final long time = System.currentTimeMillis();
 
-      final ProducerRecord<String, String> record = new ProducerRecord<>(
+      final ProducerRecord<String, Object> record = new ProducerRecord<>(
           topic,  // Topic
           key,    // Key
           value   // Value
diff --git a/src/main/java/de/juplo/kafka/CalculateSumMessage.java b/src/main/java/de/juplo/kafka/CalculateSumMessage.java
new file mode 100644 (file)
index 0000000..5d8c414
--- /dev/null
@@ -0,0 +1,11 @@
+package de.juplo.kafka;
+
+
+import lombok.Value;
+
+
+@Value
+public class CalculateSumMessage
+{
+  private final int number;
+}