Springify: Can send multiple types of messages
authorKai Moritz <kai@juplo.de>
Sat, 28 May 2022 14:55:44 +0000 (16:55 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 12 Jun 2022 13:25:57 +0000 (15:25 +0200)
* `ClientMessage` is send as `message`
* `Greeting` is send as `greeting`

src/main/java/de/juplo/kafka/Greeting.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/RestProducer.java
src/test/java/de/juplo/kafka/ApplicationTests.java

diff --git a/src/main/java/de/juplo/kafka/Greeting.java b/src/main/java/de/juplo/kafka/Greeting.java
new file mode 100644 (file)
index 0000000..2df15a3
--- /dev/null
@@ -0,0 +1,13 @@
+package de.juplo.kafka;
+
+import lombok.Value;
+
+import java.time.LocalDateTime;
+
+
+@Value
+public class Greeting
+{
+  private final String name;
+  private final LocalDateTime when = LocalDateTime.now();
+}
index bccc166..6c852ce 100644 (file)
@@ -20,7 +20,7 @@ public class RestProducer
 {
   private final String id;
   private final String topic;
-  private final KafkaProducer<String, ClientMessage> producer;
+  private final KafkaProducer<String, Object> producer;
 
   private long produced = 0;
 
@@ -40,26 +40,46 @@ public class RestProducer
     props.put("compression.type", properties.getCompressionType());
     props.put("key.serializer", StringSerializer.class.getName());
     props.put("value.serializer", JsonSerializer.class.getName());
-    props.put(JsonSerializer.TYPE_MAPPINGS, "message:" + ClientMessage.class.getName());
+    props.put(JsonSerializer.TYPE_MAPPINGS,
+        "message:" + ClientMessage.class.getName() + "," +
+        "greeting:" + Greeting.class.getName());
 
     this.producer = new KafkaProducer<>(props);
   }
 
   @PostMapping(path = "{key}")
-  public DeferredResult<ProduceResult> send(
+  public DeferredResult<ProduceResult> message(
       @PathVariable String key,
       @RequestBody String value)
   {
-    DeferredResult<ProduceResult> result = new DeferredResult<>();
-
-    final long time = System.currentTimeMillis();
-
-    final ProducerRecord<String, ClientMessage> record = new ProducerRecord<>(
+    final ProducerRecord<String, Object> record = new ProducerRecord<>(
         topic,  // Topic
         key,    // Key
         new ClientMessage(key, value) // Value
     );
 
+    return send(record);
+  }
+
+  @PostMapping(path = "/")
+  public DeferredResult<ProduceResult> greeting(
+      @RequestBody String name)
+  {
+    final ProducerRecord<String, Object> record = new ProducerRecord<>(
+        topic,  // Topic
+        name,    // Key
+        new Greeting(name) // Value
+    );
+
+    return send(record);
+  }
+
+  private DeferredResult<ProduceResult> send(ProducerRecord<String, Object> record)
+  {
+    DeferredResult<ProduceResult> result = new DeferredResult<>();
+
+    final long time = System.currentTimeMillis();
+
     producer.send(record, (metadata, e) ->
     {
       long now = System.currentTimeMillis();
@@ -96,9 +116,8 @@ public class RestProducer
 
     long now = System.currentTimeMillis();
     log.trace(
-        "{} - Queued #{} key={} latency={}ms",
+        "{} - Queued key={} latency={}ms",
         id,
-        value,
         record.key(),
         now - time
     );
index d872c2f..71625fd 100644 (file)
@@ -61,6 +61,17 @@ public class ApplicationTests
                                .until(() -> consumer.received.size() == 1);
        }
 
+       @Test
+       void testSendGreeting() throws Exception
+       {
+               mockMvc
+                               .perform(post("/").content("peter"))
+                               .andExpect(status().isOk());
+               await("Message was send")
+                               .atMost(Duration.ofSeconds(5))
+                               .until(() -> consumer.received.size() == 1);
+       }
+
 
        static class Consumer
        {