Vorlage für die JSON-Version des Rest-Producers
authorKai Moritz <kai@juplo.de>
Sun, 6 Nov 2022 19:11:41 +0000 (20:11 +0100)
committerKai Moritz <kai@juplo.de>
Tue, 31 Jan 2023 16:40:51 +0000 (17:40 +0100)
README.sh
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/RestProducer.java
src/test/java/de/juplo/kafka/ApplicationTests.java

index 8451679..e5321d5 100755 (executable)
--- a/README.sh
+++ b/README.sh
@@ -31,6 +31,4 @@ while ! [[ $(http 0:8080/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Wait
 
 echo -n 3 | http -v :8080/foo;
 
-# tag::kafkacat[]
 kafkacat -b :9092 -t test -o 0 -e -f 'p=%p|o=%o|k=%k|h=%h|v=%s\n'
-# end::kafkacat[]
index 9a11f6e..f1d773e 100644 (file)
@@ -1,11 +1,11 @@
 package de.juplo.kafka;
 
 import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
-import org.springframework.kafka.support.serializer.JsonSerializer;
 
 import java.util.Properties;
 
@@ -40,10 +40,7 @@ public class ApplicationConfiguration
     props.put("linger.ms", properties.getLingerMs());
     props.put("compression.type", properties.getCompressionType());
     props.put("key.serializer", StringSerializer.class.getName());
-    props.put("value.serializer", JsonSerializer.class.getName());
-    props.put(JsonSerializer.TYPE_MAPPINGS,
-        "ADD:" + AddNumberMessage.class.getName() + "," +
-        "CALC:" + CalculateSumMessage.class.getName());
+    props.put("value.serializer", IntegerSerializer.class.getName());
 
     return new KafkaProducer<>(props);
   }
index 4be2dcd..2e0da97 100644 (file)
@@ -30,13 +30,9 @@ public class RestProducer
       @RequestHeader(name = "X-id", required = false) Long correlationId,
       @RequestBody Integer number)
   {
-    ResultRecorder result = new ResultRecorder(number+1);
+    ResultRecorder result = new ResultRecorder(1);
 
-    for (int i = 1; i <= number; i++)
-    {
-      send(key, new AddNumberMessage(number, i), correlationId, result);
-    }
-    send(key, new CalculateSumMessage(number), correlationId, result);
+    send(key, number, correlationId, result);
 
     return result.getDeferredResult();
   }
index b9c1e17..5844761 100644 (file)
@@ -62,7 +62,7 @@ public class ApplicationTests
                                .andExpect(status().isOk());
                await("Message was send")
                                .atMost(Duration.ofSeconds(5))
-                               .until(() -> consumer.received.size() == 667);
+                               .until(() -> consumer.received.size() == 1);
        }