From 6326d5c5f20fa24161ba1a4b7d10ce42cb9d416c Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 3 Sep 2022 13:58:41 +0200 Subject: [PATCH] sumup-requests verschickt JSON-Nachrichten mit Hilfe von Spring Kafka --- pom.xml | 4 ++-- src/main/java/de/juplo/kafka/AddNumberMessage.java | 11 +++++++++++ .../de/juplo/kafka/ApplicationConfiguration.java | 12 ++++++++---- .../de/juplo/kafka/ApplicationRecordHandler.java | 10 +++++----- .../java/de/juplo/kafka/CalculateSumMessage.java | 11 +++++++++++ 5 files changed, 37 insertions(+), 11 deletions(-) create mode 100644 src/main/java/de/juplo/kafka/AddNumberMessage.java create mode 100644 src/main/java/de/juplo/kafka/CalculateSumMessage.java diff --git a/pom.xml b/pom.xml index 0c24f8b..0f5c319 100644 --- a/pom.xml +++ b/pom.xml @@ -36,8 +36,8 @@ true - org.apache.kafka - kafka-clients + org.springframework.kafka + spring-kafka org.projectlombok diff --git a/src/main/java/de/juplo/kafka/AddNumberMessage.java b/src/main/java/de/juplo/kafka/AddNumberMessage.java new file mode 100644 index 0000000..88b5d6f --- /dev/null +++ b/src/main/java/de/juplo/kafka/AddNumberMessage.java @@ -0,0 +1,11 @@ +package de.juplo.kafka; + +import lombok.Value; + + +@Value +public class AddNumberMessage +{ + private final int number; + private final int next; +} diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 60f45a9..033d0cc 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -8,7 +8,7 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; - +import org.springframework.kafka.support.serializer.JsonSerializer; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -20,7 +20,7 @@ public class ApplicationConfiguration { @Bean public ApplicationRecordHandler recordHandler( - KafkaProducer kafkaProducer, + KafkaProducer kafkaProducer, ApplicationProperties properties) { return new ApplicationRecordHandler( @@ -70,7 +70,7 @@ public class ApplicationConfiguration } @Bean(destroyMethod = "close") - public KafkaProducer kafkaProducer(ApplicationProperties properties) + public KafkaProducer kafkaProducer(ApplicationProperties properties) { Properties props = new Properties(); props.put("bootstrap.servers", properties.getBootstrapServer()); @@ -82,7 +82,11 @@ public class ApplicationConfiguration props.put("linger.ms", properties.getLingerMs()); props.put("compression.type", properties.getCompressionType()); props.put("key.serializer", StringSerializer.class.getName()); - props.put("value.serializer", StringSerializer.class.getName()); + props.put("value.serializer", JsonSerializer.class.getName()); + props.put(JsonSerializer.TYPE_MAPPINGS, + "ADD:" + AddNumberMessage.class.getName() + "," + + "CALC:" + CalculateSumMessage.class.getName()); + return new KafkaProducer<>(props); } diff --git a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java index eae009c..8431a53 100644 --- a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java +++ b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java @@ -11,7 +11,7 @@ import org.apache.kafka.clients.producer.ProducerRecord; @Slf4j public class ApplicationRecordHandler implements RecordHandler { - private final Producer producer; + private final Producer producer; private final String id; private final String topic; @@ -24,16 +24,16 @@ public class ApplicationRecordHandler implements RecordHandler for (int i = 1; i <= number; i++) { - send(key, Integer.toString(i)); + send(key, new AddNumberMessage(number, i)); } - send(key, "CALCULATE"); + send(key, new CalculateSumMessage(number)); } - private void send(String key, String value) + private void send(String key, Object value) { final long time = System.currentTimeMillis(); - final ProducerRecord record = new ProducerRecord<>( + final ProducerRecord record = new ProducerRecord<>( topic, // Topic key, // Key value // Value diff --git a/src/main/java/de/juplo/kafka/CalculateSumMessage.java b/src/main/java/de/juplo/kafka/CalculateSumMessage.java new file mode 100644 index 0000000..5d8c414 --- /dev/null +++ b/src/main/java/de/juplo/kafka/CalculateSumMessage.java @@ -0,0 +1,11 @@ +package de.juplo.kafka; + + +import lombok.Value; + + +@Value +public class CalculateSumMessage +{ + private final int number; +} -- 2.20.1