From: Kai Moritz Date: Mon, 11 Nov 2024 20:33:58 +0000 (+0100) Subject: Version des `spring-producer`, der JSON-Nachrichten sendet X-Git-Tag: spring/spring-producer--json--2026-03--vor-branchumbenennung--springframework~1 X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=d008d1e2165dd0af5c6b6d87dfac7503255e776e;p=demos%2Fkafka%2Ftraining Version des `spring-producer`, der JSON-Nachrichten sendet --- diff --git a/README.sh b/README.sh index 1d208d53..3b52a5dd 100755 --- a/README.sh +++ b/README.sh @@ -1,6 +1,6 @@ #!/bin/bash -IMAGE=juplo/spring-producer:2.0-SNAPSHOT +IMAGE=juplo/spring-producer:2.0-json-SNAPSHOT if [ "$1" = "cleanup" ] then diff --git a/build.gradle b/build.gradle index 60118b63..3e56babb 100644 --- a/build.gradle +++ b/build.gradle @@ -8,7 +8,7 @@ plugins { } group = 'de.juplo.kafka' -version = '2.0-SNAPSHOT' +version = '2.0-json-SNAPSHOT' java { toolchain { diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index d151dab5..5be08a7b 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -173,7 +173,7 @@ services: - kafka-3 producer: - image: juplo/spring-producer:2.0-SNAPSHOT + image: juplo/spring-producer:2.0-json-SNAPSHOT environment: spring.kafka.bootstrap-servers: kafka:9092 spring.kafka.client-id: producer diff --git a/pom.xml b/pom.xml index 9e11d624..a58adf6e 100644 --- a/pom.xml +++ b/pom.xml @@ -15,7 +15,7 @@ spring-producer Spring Producer A Simple Producer, based on Spring Boot, that sends messages via Kafka - 2.0-SNAPSHOT + 2.0-json-SNAPSHOT 21 diff --git a/src/main/java/de/juplo/kafka/AddNumberMessage.java b/src/main/java/de/juplo/kafka/AddNumberMessage.java new file mode 100644 index 00000000..deb6350e --- /dev/null +++ b/src/main/java/de/juplo/kafka/AddNumberMessage.java @@ -0,0 +1,11 @@ +package de.juplo.kafka; + +import lombok.Value; + + +@Value +public class AddNumberMessage implements SumupMessage +{ + private final int number; + private final int next; +} diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index efdfafa1..e212a253 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -19,7 +19,7 @@ public class ApplicationConfiguration public ExampleProducer exampleProducer( @Value("${spring.kafka.client-id}") String clientId, ApplicationProperties properties, - Producer kafkaProducer, + Producer kafkaProducer, ConfigurableApplicationContext applicationContext) { return diff --git a/src/main/java/de/juplo/kafka/CalculateSumMessage.java b/src/main/java/de/juplo/kafka/CalculateSumMessage.java new file mode 100644 index 00000000..6aa0121a --- /dev/null +++ b/src/main/java/de/juplo/kafka/CalculateSumMessage.java @@ -0,0 +1,11 @@ +package de.juplo.kafka; + + +import lombok.Value; + + +@Value +public class CalculateSumMessage implements SumupMessage +{ + private final int number; +} diff --git a/src/main/java/de/juplo/kafka/ExampleProducer.java b/src/main/java/de/juplo/kafka/ExampleProducer.java index 93d0d173..04a0a992 100644 --- a/src/main/java/de/juplo/kafka/ExampleProducer.java +++ b/src/main/java/de/juplo/kafka/ExampleProducer.java @@ -13,7 +13,7 @@ public class ExampleProducer implements Runnable private final String id; private final String topic; private final Duration throttle; - private final Producer producer; + private final Producer producer; private final Thread workerThread; private final Runnable closeCallback; @@ -25,7 +25,7 @@ public class ExampleProducer implements Runnable String id, String topic, Duration throttle, - Producer producer, + Producer producer, Runnable closeCallback) { this.id = id; @@ -43,13 +43,18 @@ public class ExampleProducer implements Runnable @Override public void run() { - long i = 0; + int i = 0; try { for (; running; i++) { - send(Long.toString(i%10), Long.toString(i)); + int number = i % 10; + SumupMessage message = (i % 7 == 0) + ? new CalculateSumMessage(number) + : new AddNumberMessage(number, i); + + send(Long.toString(number), message); if (throttle.isPositive()) { @@ -78,11 +83,11 @@ public class ExampleProducer implements Runnable } } - void send(String key, String value) + void send(String key, SumupMessage value) { final long sendRequested = System.currentTimeMillis(); - final ProducerRecord record = new ProducerRecord<>( + final ProducerRecord record = new ProducerRecord<>( topic, // Topic key, // Key value // Value diff --git a/src/main/java/de/juplo/kafka/SumupMessage.java b/src/main/java/de/juplo/kafka/SumupMessage.java new file mode 100644 index 00000000..739efd1e --- /dev/null +++ b/src/main/java/de/juplo/kafka/SumupMessage.java @@ -0,0 +1,5 @@ +package de.juplo.kafka; + +public interface SumupMessage +{ +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 3f6c233a..12dc5e8a 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -11,7 +11,11 @@ spring: buffer-memory: 33554432 batch-size: 16384 compression-type: gzip + value-serializer: org.springframework.kafka.support.serializer.JsonSerializer properties: + spring.json.type.mapping: >- + ADD:de.juplo.kafka.AddNumberMessage, + CALC:de.juplo.kafka.CalculateSumMessage metadata.max.age.ms: 5000 request.timeout.ms: 5000 delivery.timeout.ms: 10000