From: Kai Moritz Date: Sun, 12 Jun 2022 13:14:49 +0000 (+0200) Subject: Springify: Der Payload ist eine als JSON gerenderte Klasse X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;ds=sidebyside;h=39d1bd1fc7b641255efd598ca4f037bd529d5fe8;p=demos%2Fkafka%2Ftraining Springify: Der Payload ist eine als JSON gerenderte Klasse * Als Nachricht wird eine Instanz der Klasse `ClientMessage` verschickt * Die Instanz wird mit Hilfe des `JsonSerializer` von Spring Kafka serialisiert. --- diff --git a/pom.xml b/pom.xml index b736152..295d1f4 100644 --- a/pom.xml +++ b/pom.xml @@ -12,9 +12,9 @@ de.juplo.kafka - rest-producer - REST Producer - A Simple Producer that takes messages via POST and confirms successs + springified-producer + Springified REST Producer + A Simple Producer that is implemented with the help of Spring Kafka and takes messages via POST and confirms successs 1.0-SNAPSHOT @@ -35,6 +35,10 @@ org.apache.kafka kafka-clients + + org.springframework.kafka + spring-kafka + org.projectlombok lombok @@ -44,11 +48,6 @@ spring-boot-starter-test test - - org.springframework.kafka - spring-kafka - test - org.springframework.kafka spring-kafka-test diff --git a/src/main/java/de/juplo/kafka/ClientMessage.java b/src/main/java/de/juplo/kafka/ClientMessage.java new file mode 100644 index 0000000..042fdc4 --- /dev/null +++ b/src/main/java/de/juplo/kafka/ClientMessage.java @@ -0,0 +1,11 @@ +package de.juplo.kafka; + +import lombok.Value; + + +@Value +public class ClientMessage +{ + private final String client; + private final String message; +} diff --git a/src/main/java/de/juplo/kafka/RestProducer.java b/src/main/java/de/juplo/kafka/RestProducer.java index 7d9bf12..e564a66 100644 --- a/src/main/java/de/juplo/kafka/RestProducer.java +++ b/src/main/java/de/juplo/kafka/RestProducer.java @@ -5,14 +5,13 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.http.HttpStatus; -import org.springframework.http.MediaType; +import org.springframework.kafka.support.serializer.JsonSerializer; import org.springframework.web.bind.annotation.*; import org.springframework.web.context.request.async.DeferredResult; import javax.annotation.PreDestroy; import java.util.Properties; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; @Slf4j @@ -21,7 +20,7 @@ public class RestProducer { private final String id; private final String topic; - private final KafkaProducer producer; + private final KafkaProducer producer; private long produced = 0; @@ -40,7 +39,7 @@ public class RestProducer props.put("linger.ms", properties.getLingerMs()); props.put("compression.type", properties.getCompressionType()); props.put("key.serializer", StringSerializer.class.getName()); - props.put("value.serializer", StringSerializer.class.getName()); + props.put("value.serializer", JsonSerializer.class.getName()); this.producer = new KafkaProducer<>(props); } @@ -54,10 +53,10 @@ public class RestProducer final long time = System.currentTimeMillis(); - final ProducerRecord record = new ProducerRecord<>( + final ProducerRecord record = new ProducerRecord<>( topic, // Topic key, // Key - value // Value + new ClientMessage(key, value) // Value ); producer.send(record, (metadata, e) -> diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index cf70c81..d872c2f 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -51,7 +51,7 @@ public class ApplicationTests @Test - void testSendMessage() throws Exception + void testSendClientMessage() throws Exception { mockMvc .perform(post("/peter").content("Hallo Welt!"))