From a894389cc76189a3ed486493ce4aaf79c220f939 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 6 Nov 2022 20:11:41 +0100 Subject: [PATCH] =?utf8?q?Vorlage=20f=C3=BCr=20die=20JSON-Version=20des=20?= =?utf8?q?Rest-Producers?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- README.sh | 2 -- .../java/de/juplo/kafka/ApplicationConfiguration.java | 7 ++----- src/main/java/de/juplo/kafka/RestProducer.java | 8 ++------ src/test/java/de/juplo/kafka/ApplicationTests.java | 2 +- 4 files changed, 5 insertions(+), 14 deletions(-) diff --git a/README.sh b/README.sh index 0e8d4d3..978ef4b 100755 --- a/README.sh +++ b/README.sh @@ -34,6 +34,4 @@ docker-compose up -d peter klaus sleep 10 docker-compose kill -s9 peter klaus -# tag::kafkacat[] kafkacat -b :9092 -t test -o 0 -e -f 'p=%p|o=%o|k=%k|h=%h|v=%s\n' -# end::kafkacat[] diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 9a11f6e..f1d773e 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -1,11 +1,11 @@ package de.juplo.kafka; import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.kafka.support.serializer.JsonSerializer; import java.util.Properties; @@ -40,10 +40,7 @@ public class ApplicationConfiguration props.put("linger.ms", properties.getLingerMs()); props.put("compression.type", properties.getCompressionType()); props.put("key.serializer", StringSerializer.class.getName()); - props.put("value.serializer", JsonSerializer.class.getName()); - props.put(JsonSerializer.TYPE_MAPPINGS, - "ADD:" + AddNumberMessage.class.getName() + "," + - "CALC:" + CalculateSumMessage.class.getName()); + props.put("value.serializer", IntegerSerializer.class.getName()); return new KafkaProducer<>(props); } diff --git a/src/main/java/de/juplo/kafka/RestProducer.java b/src/main/java/de/juplo/kafka/RestProducer.java index 4be2dcd..2e0da97 100644 --- a/src/main/java/de/juplo/kafka/RestProducer.java +++ b/src/main/java/de/juplo/kafka/RestProducer.java @@ -30,13 +30,9 @@ public class RestProducer @RequestHeader(name = "X-id", required = false) Long correlationId, @RequestBody Integer number) { - ResultRecorder result = new ResultRecorder(number+1); + ResultRecorder result = new ResultRecorder(1); - for (int i = 1; i <= number; i++) - { - send(key, new AddNumberMessage(number, i), correlationId, result); - } - send(key, new CalculateSumMessage(number), correlationId, result); + send(key, number, correlationId, result); return result.getDeferredResult(); } diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index b9c1e17..5844761 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -62,7 +62,7 @@ public class ApplicationTests .andExpect(status().isOk()); await("Message was send") .atMost(Duration.ofSeconds(5)) - .until(() -> consumer.received.size() == 667); + .until(() -> consumer.received.size() == 1); } -- 2.20.1