Springify: Der Payload ist eine als JSON gerenderte Klasse
authorKai Moritz <kai@juplo.de>
Sun, 12 Jun 2022 13:14:49 +0000 (15:14 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 12 Jun 2022 13:23:09 +0000 (15:23 +0200)
* Als Nachricht wird eine Instanz der Klasse `ClientMessage` verschickt
* Die Instanz wird mit Hilfe des `JsonSerializer` von Spring Kafka
  serialisiert.

pom.xml
src/main/java/de/juplo/kafka/ClientMessage.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/RestProducer.java
src/test/java/de/juplo/kafka/ApplicationTests.java

diff --git a/pom.xml b/pom.xml
index b736152..295d1f4 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -12,9 +12,9 @@
   </parent>
 
   <groupId>de.juplo.kafka</groupId>
-  <artifactId>rest-producer</artifactId>
-  <name>REST Producer</name>
-  <description>A Simple Producer that takes messages via POST and confirms successs</description>
+  <artifactId>springified-producer</artifactId>
+  <name>Springified REST Producer</name>
+  <description>A Simple Producer that is implemented with the help of Spring Kafka and takes messages via POST and confirms successs</description>
   <version>1.0-SNAPSHOT</version>
 
   <dependencies>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka-clients</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.springframework.kafka</groupId>
+      <artifactId>spring-kafka</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.projectlombok</groupId>
       <artifactId>lombok</artifactId>
       <artifactId>spring-boot-starter-test</artifactId>
       <scope>test</scope>
     </dependency>
-    <dependency>
-      <groupId>org.springframework.kafka</groupId>
-      <artifactId>spring-kafka</artifactId>
-      <scope>test</scope>
-    </dependency>
     <dependency>
       <groupId>org.springframework.kafka</groupId>
       <artifactId>spring-kafka-test</artifactId>
diff --git a/src/main/java/de/juplo/kafka/ClientMessage.java b/src/main/java/de/juplo/kafka/ClientMessage.java
new file mode 100644 (file)
index 0000000..042fdc4
--- /dev/null
@@ -0,0 +1,11 @@
+package de.juplo.kafka;
+
+import lombok.Value;
+
+
+@Value
+public class ClientMessage
+{
+  private final String client;
+  private final String message;
+}
index 7d9bf12..e564a66 100644 (file)
@@ -5,14 +5,13 @@ import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.springframework.http.HttpStatus;
-import org.springframework.http.MediaType;
+import org.springframework.kafka.support.serializer.JsonSerializer;
 import org.springframework.web.bind.annotation.*;
 import org.springframework.web.context.request.async.DeferredResult;
 
 import javax.annotation.PreDestroy;
 import java.util.Properties;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
 
 
 @Slf4j
@@ -21,7 +20,7 @@ public class RestProducer
 {
   private final String id;
   private final String topic;
-  private final KafkaProducer<String, String> producer;
+  private final KafkaProducer<String, ClientMessage> producer;
 
   private long produced = 0;
 
@@ -40,7 +39,7 @@ public class RestProducer
     props.put("linger.ms", properties.getLingerMs());
     props.put("compression.type", properties.getCompressionType());
     props.put("key.serializer", StringSerializer.class.getName());
-    props.put("value.serializer", StringSerializer.class.getName());
+    props.put("value.serializer", JsonSerializer.class.getName());
 
     this.producer = new KafkaProducer<>(props);
   }
@@ -54,10 +53,10 @@ public class RestProducer
 
     final long time = System.currentTimeMillis();
 
-    final ProducerRecord<String, String> record = new ProducerRecord<>(
+    final ProducerRecord<String, ClientMessage> record = new ProducerRecord<>(
         topic,  // Topic
         key,    // Key
-        value   // Value
+        new ClientMessage(key, value) // Value
     );
 
     producer.send(record, (metadata, e) ->
index cf70c81..d872c2f 100644 (file)
@@ -51,7 +51,7 @@ public class ApplicationTests
 
 
        @Test
-       void testSendMessage() throws Exception
+       void testSendClientMessage() throws Exception
        {
                mockMvc
                                .perform(post("/peter").content("Hallo Welt!"))