From 401f51678bb0a797a7a937e2da02eac195243ac6 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Mon, 11 Nov 2024 21:33:58 +0100 Subject: [PATCH] Version des `spring-producer`, der JSON-Nachrichten sendet --- README.sh | 2 +- build.gradle | 5 ++-- docker/docker-compose.yml | 23 +++++++++++++------ pom.xml | 11 +++------ .../java/de/juplo/kafka/AddNumberMessage.java | 11 +++++++++ .../juplo/kafka/ApplicationConfiguration.java | 8 ++++--- .../de/juplo/kafka/CalculateSumMessage.java | 11 +++++++++ .../java/de/juplo/kafka/ExampleProducer.java | 17 +++++++++----- .../java/de/juplo/kafka/SumupMessage.java | 5 ++++ 9 files changed, 65 insertions(+), 28 deletions(-) create mode 100644 src/main/java/de/juplo/kafka/AddNumberMessage.java create mode 100644 src/main/java/de/juplo/kafka/CalculateSumMessage.java create mode 100644 src/main/java/de/juplo/kafka/SumupMessage.java diff --git a/README.sh b/README.sh index c8a0b221..982f7bd3 100755 --- a/README.sh +++ b/README.sh @@ -1,6 +1,6 @@ #!/bin/bash -IMAGE=juplo/spring-producer:1.0-SNAPSHOT +IMAGE=juplo/spring-producer:1.0-json-SNAPSHOT if [ "$1" = "cleanup" ] then diff --git a/build.gradle b/build.gradle index 1429c4dd..7556511b 100644 --- a/build.gradle +++ b/build.gradle @@ -8,7 +8,7 @@ plugins { } group = 'de.juplo.kafka' -version = '1.0-SNAPSHOT' +version = '1.0-json-SNAPSHOT' java { toolchain { @@ -27,16 +27,15 @@ repositories { } dependencies { - implementation 'org.apache.kafka:kafka-clients' implementation 'org.springframework.boot:spring-boot-starter-actuator' implementation 'org.springframework.boot:spring-boot-starter-validation' implementation 'org.springframework.boot:spring-boot-starter-web' + implementation 'org.springframework.kafka:spring-kafka' compileOnly 'org.projectlombok:lombok' developmentOnly 'org.springframework.boot:spring-boot-devtools' annotationProcessor 'org.springframework.boot:spring-boot-configuration-processor' annotationProcessor 'org.projectlombok:lombok' testImplementation 'org.springframework.boot:spring-boot-starter-test' - testImplementation 'org.springframework.kafka:spring-kafka' testImplementation 'org.springframework.kafka:spring-kafka-test' testCompileOnly 'org.projectlombok:lombok' testAnnotationProcessor 'org.projectlombok:lombok' diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 00f68fcc..2e623598 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -136,23 +136,32 @@ services: - kafka-3 producer: - image: juplo/spring-producer:1.0-SNAPSHOT + image: juplo/spring-producer:1.0-json-SNAPSHOT environment: juplo.bootstrap-server: kafka:9092 juplo.client-id: producer juplo.producer.topic: test consumer: - image: juplo/simple-consumer:1.0-SNAPSHOT - command: kafka:9092 test my-group consumer + image: juplo/spring-consumer:1.1-SNAPSHOT + environment: + juplo.bootstrap-server: kafka:9092 + juplo.client-id: consumer + juplo.consumer.topic: test peter: - image: juplo/simple-consumer:1.0-SNAPSHOT - command: kafka:9092 test my-group peter + image: juplo/spring-consumer:1.1-SNAPSHOT + environment: + juplo.bootstrap-server: kafka:9092 + juplo.client-id: peter + juplo.consumer.topic: test ute: - image: juplo/simple-consumer:1.0-SNAPSHOT - command: kafka:9092 test my-group ute + image: juplo/spring-consumer:1.1-SNAPSHOT + environment: + juplo.bootstrap-server: kafka:9092 + juplo.client-id: ute + juplo.consumer.topic: test volumes: zookeeper-data: diff --git a/pom.xml b/pom.xml index f64266b4..7bcb24cd 100644 --- a/pom.xml +++ b/pom.xml @@ -15,7 +15,7 @@ spring-producer Spring Producer A Simple Producer, based on Spring Boot, that sends messages via Kafka - 1.0-SNAPSHOT + 1.0-json-SNAPSHOT 21 @@ -40,8 +40,8 @@ spring-boot-starter-validation - org.apache.kafka - kafka-clients + org.springframework.kafka + spring-kafka org.projectlombok @@ -53,11 +53,6 @@ spring-boot-starter-test test - - org.springframework.kafka - spring-kafka - test - org.springframework.kafka spring-kafka-test diff --git a/src/main/java/de/juplo/kafka/AddNumberMessage.java b/src/main/java/de/juplo/kafka/AddNumberMessage.java new file mode 100644 index 00000000..deb6350e --- /dev/null +++ b/src/main/java/de/juplo/kafka/AddNumberMessage.java @@ -0,0 +1,11 @@ +package de.juplo.kafka; + +import lombok.Value; + + +@Value +public class AddNumberMessage implements SumupMessage +{ + private final int number; + private final int next; +} diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 7540dd33..d5ce01a1 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -7,6 +7,7 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.support.serializer.JsonSerializer; import java.time.Duration; import java.util.Properties; @@ -19,7 +20,7 @@ public class ApplicationConfiguration @Bean public ExampleProducer exampleProducer( ApplicationProperties properties, - Producer kafkaProducer, + Producer kafkaProducer, ConfigurableApplicationContext applicationContext) { return @@ -34,7 +35,7 @@ public class ApplicationConfiguration } @Bean(destroyMethod = "") - public KafkaProducer kafkaProducer(ApplicationProperties properties) + public KafkaProducer kafkaProducer(ApplicationProperties properties) { Properties props = new Properties(); props.put("bootstrap.servers", properties.getBootstrapServer()); @@ -49,7 +50,8 @@ public class ApplicationConfiguration props.put("linger.ms", properties.getProducerProperties().getLinger().toMillis()); props.put("compression.type", properties.getProducerProperties().getCompressionType()); props.put("key.serializer", StringSerializer.class.getName()); - props.put("value.serializer", StringSerializer.class.getName()); + props.put("value.serializer", JsonSerializer.class.getName()); + props.put("spring.json.type.mapping", "ADD:de.juplo.kafka.AddNumberMessage,CALC:de.juplo.kafka.CalculateSumMessage"); return new KafkaProducer<>(props); } diff --git a/src/main/java/de/juplo/kafka/CalculateSumMessage.java b/src/main/java/de/juplo/kafka/CalculateSumMessage.java new file mode 100644 index 00000000..6aa0121a --- /dev/null +++ b/src/main/java/de/juplo/kafka/CalculateSumMessage.java @@ -0,0 +1,11 @@ +package de.juplo.kafka; + + +import lombok.Value; + + +@Value +public class CalculateSumMessage implements SumupMessage +{ + private final int number; +} diff --git a/src/main/java/de/juplo/kafka/ExampleProducer.java b/src/main/java/de/juplo/kafka/ExampleProducer.java index 3fcd05d1..7f275755 100644 --- a/src/main/java/de/juplo/kafka/ExampleProducer.java +++ b/src/main/java/de/juplo/kafka/ExampleProducer.java @@ -13,7 +13,7 @@ public class ExampleProducer implements Runnable private final String id; private final String topic; private final Duration throttle; - private final Producer producer; + private final Producer producer; private final Thread workerThread; private final Runnable closeCallback; @@ -25,7 +25,7 @@ public class ExampleProducer implements Runnable String id, String topic, Duration throttle, - Producer producer, + Producer producer, Runnable closeCallback) { this.id = id; @@ -43,13 +43,18 @@ public class ExampleProducer implements Runnable @Override public void run() { - long i = 0; + int i = 0; try { for (; running; i++) { - send(Long.toString(i%10), Long.toString(i)); + int number = i % 10; + SumupMessage message = (i % 7 == 0) + ? new CalculateSumMessage(number) + : new AddNumberMessage(number, i); + + send(Long.toString(number), message); if (throttle.isPositive()) { @@ -78,11 +83,11 @@ public class ExampleProducer implements Runnable } } - void send(String key, String value) + void send(String key, SumupMessage value) { final long time = System.currentTimeMillis(); - final ProducerRecord record = new ProducerRecord<>( + final ProducerRecord record = new ProducerRecord<>( topic, // Topic key, // Key value // Value diff --git a/src/main/java/de/juplo/kafka/SumupMessage.java b/src/main/java/de/juplo/kafka/SumupMessage.java new file mode 100644 index 00000000..739efd1e --- /dev/null +++ b/src/main/java/de/juplo/kafka/SumupMessage.java @@ -0,0 +1,5 @@ +package de.juplo.kafka; + +public interface SumupMessage +{ +} -- 2.20.1