From 647949b144608424ab2538e12af143620a464a9d Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Mon, 11 Nov 2024 21:24:25 +0100 Subject: [PATCH] Version des `spring-consumer`, der JSON-Nachrichten erwartet MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Fachliche Behandlung der unterschiedlichen Nachrichten-Typen ergänzt --- README.sh | 2 +- build.gradle | 5 ++-- docker/docker-compose.yml | 8 ++--- pom.xml | 11 ++----- .../juplo/kafka/ApplicationConfiguration.java | 8 +++-- .../java/de/juplo/kafka/ExampleConsumer.java | 25 ++++++++++++---- src/main/java/de/juplo/kafka/Message.java | 9 ++++++ .../java/de/juplo/kafka/MessageAddNumber.java | 19 ++++++++++++ .../de/juplo/kafka/MessageCalculateSum.java | 16 ++++++++++ src/test/java/de/juplo/kafka/MessageTest.java | 29 +++++++++++++++++++ 10 files changed, 108 insertions(+), 24 deletions(-) create mode 100644 src/main/java/de/juplo/kafka/Message.java create mode 100644 src/main/java/de/juplo/kafka/MessageAddNumber.java create mode 100644 src/main/java/de/juplo/kafka/MessageCalculateSum.java create mode 100644 src/test/java/de/juplo/kafka/MessageTest.java diff --git a/README.sh b/README.sh index b46e2350..7152ec9d 100755 --- a/README.sh +++ b/README.sh @@ -1,6 +1,6 @@ #!/bin/bash -IMAGE=juplo/spring-consumer:1.1-SNAPSHOT +IMAGE=juplo/spring-consumer:1.1-json-SNAPSHOT if [ "$1" = "cleanup" ] then diff --git a/build.gradle b/build.gradle index a8614fdf..3ddca4bb 100644 --- a/build.gradle +++ b/build.gradle @@ -8,7 +8,7 @@ plugins { } group = 'de.juplo.kafka' -version = '1.1-SNAPSHOT' +version = '1.1-json-SNAPSHOT' java { toolchain { @@ -27,7 +27,7 @@ repositories { } dependencies { - implementation 'org.apache.kafka:kafka-clients' + implementation 'org.springframework.kafka:spring-kafka' implementation 'org.springframework.boot:spring-boot-starter-actuator' implementation 'org.springframework.boot:spring-boot-starter-validation' implementation 'org.springframework.boot:spring-boot-starter-web' @@ -36,7 +36,6 @@ dependencies { annotationProcessor 'org.springframework.boot:spring-boot-configuration-processor' annotationProcessor 'org.projectlombok:lombok' testImplementation 'org.springframework.boot:spring-boot-starter-test' - testImplementation 'org.springframework.kafka:spring-kafka' testImplementation 'org.springframework.kafka:spring-kafka-test' testCompileOnly 'org.projectlombok:lombok' testAnnotationProcessor 'org.projectlombok:lombok' diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 4fa2eade..9fb105f3 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -136,7 +136,7 @@ services: - kafka-3 producer: - image: juplo/spring-producer:1.0-SNAPSHOT + image: juplo/spring-producer:1.0-json-SNAPSHOT environment: juplo.bootstrap-server: kafka:9092 juplo.client-id: producer @@ -145,21 +145,21 @@ services: juplo.producer.throttle-ms: 100 consumer: - image: juplo/spring-consumer:1.1-SNAPSHOT + image: juplo/spring-consumer:1.1-json-SNAPSHOT environment: juplo.bootstrap-server: kafka:9092 juplo.client-id: consumer juplo.consumer.topic: test peter: - image: juplo/spring-consumer:1.1-SNAPSHOT + image: juplo/spring-consumer:1.1-json-SNAPSHOT environment: juplo.bootstrap-server: kafka:9092 juplo.client-id: peter juplo.consumer.topic: test ute: - image: juplo/spring-consumer:1.1-SNAPSHOT + image: juplo/spring-consumer:1.1-json-SNAPSHOT environment: juplo.bootstrap-server: kafka:9092 juplo.client-id: ute diff --git a/pom.xml b/pom.xml index dd96d00f..8dd99a57 100644 --- a/pom.xml +++ b/pom.xml @@ -15,7 +15,7 @@ spring-consumer Spring Consumer Super Simple Consumer-Group, that is implemented as Spring-Boot application and configured by Spring Kafka - 1.1-SNAPSHOT + 1.1-json-SNAPSHOT 21 @@ -40,8 +40,8 @@ spring-boot-starter-validation - org.apache.kafka - kafka-clients + org.springframework.kafka + spring-kafka org.projectlombok @@ -53,11 +53,6 @@ spring-boot-starter-test test - - org.springframework.kafka - spring-kafka - test - org.springframework.kafka spring-kafka-test diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index b98c401d..33022bf6 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -4,6 +4,7 @@ import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.StickyAssignor; import org.apache.kafka.common.serialization.StringDeserializer; +import org.springframework.kafka.support.serializer.JsonDeserializer; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; @@ -18,7 +19,7 @@ public class ApplicationConfiguration { @Bean public ExampleConsumer exampleConsumer( - Consumer kafkaConsumer, + Consumer kafkaConsumer, ApplicationProperties properties, ConfigurableApplicationContext applicationContext) { @@ -31,7 +32,7 @@ public class ApplicationConfiguration } @Bean(destroyMethod = "") - public KafkaConsumer kafkaConsumer(ApplicationProperties properties) + public KafkaConsumer kafkaConsumer(ApplicationProperties properties) { Properties props = new Properties(); props.put("bootstrap.servers", properties.getBootstrapServer()); @@ -48,7 +49,8 @@ public class ApplicationConfiguration props.put("metadata.maxage.ms", 5000); // 5 Sekunden props.put("partition.assignment.strategy", StickyAssignor.class.getName()); props.put("key.deserializer", StringDeserializer.class.getName()); - props.put("value.deserializer", StringDeserializer.class.getName()); + props.put("value.deserializer", JsonDeserializer.class.getName()); + props.put("spring.json.type.mapping", "ADD:de.juplo.kafka.MessageAddNumber,CALC:de.juplo.kafka.MessageCalculateSum"); return new KafkaConsumer<>(props); } diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index 1f5a5706..a03fece4 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -15,7 +15,7 @@ public class ExampleConsumer implements Runnable { private final String id; private final String topic; - private final Consumer consumer; + private final Consumer consumer; private final Thread workerThread; private final Runnable closeCallback; @@ -25,7 +25,7 @@ public class ExampleConsumer implements Runnable public ExampleConsumer( String clientId, String topic, - Consumer consumer, + Consumer consumer, Runnable closeCallback) { this.id = clientId; @@ -49,10 +49,10 @@ public class ExampleConsumer implements Runnable while (true) { - ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); + ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); log.info("{} - Received {} messages", id, records.count()); - for (ConsumerRecord record : records) + for (ConsumerRecord record : records) { handleRecord( record.topic(), @@ -87,12 +87,27 @@ public class ExampleConsumer implements Runnable Integer partition, Long offset, String key, - String value) + Message value) { consumed++; log.info("{} - partition={}-{}, offset={}: {}={}", id, topic, partition, offset, key, value); + switch (value.getType()) + { + case ADD -> addNumber((MessageAddNumber)value); + case CALC -> calcSum((MessageCalculateSum)value); + default -> log.error("{} - Ignoring message of unknown typ {}", id, value.getType()); + } } + private void addNumber(MessageAddNumber addNumber) + { + log.info("{} - Adding number {}", id, addNumber.getNext()); + } + + private void calcSum(MessageCalculateSum calculateSum) + { + log.info("{} - Calculating sum", id); + } public void shutdown() throws InterruptedException { diff --git a/src/main/java/de/juplo/kafka/Message.java b/src/main/java/de/juplo/kafka/Message.java new file mode 100644 index 00000000..e4999b77 --- /dev/null +++ b/src/main/java/de/juplo/kafka/Message.java @@ -0,0 +1,9 @@ +package de.juplo.kafka; + + +public abstract class Message +{ + public enum Type {ADD, CALC} + + public abstract Type getType(); +} diff --git a/src/main/java/de/juplo/kafka/MessageAddNumber.java b/src/main/java/de/juplo/kafka/MessageAddNumber.java new file mode 100644 index 00000000..c024b65d --- /dev/null +++ b/src/main/java/de/juplo/kafka/MessageAddNumber.java @@ -0,0 +1,19 @@ +package de.juplo.kafka; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.Data; + + +@Data +@JsonIgnoreProperties(ignoreUnknown = true) +public class MessageAddNumber extends Message +{ + private Integer next; + + + @Override + public Type getType() + { + return Type.ADD; + } +} diff --git a/src/main/java/de/juplo/kafka/MessageCalculateSum.java b/src/main/java/de/juplo/kafka/MessageCalculateSum.java new file mode 100644 index 00000000..afc5a393 --- /dev/null +++ b/src/main/java/de/juplo/kafka/MessageCalculateSum.java @@ -0,0 +1,16 @@ +package de.juplo.kafka; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.Data; + + +@Data +@JsonIgnoreProperties(ignoreUnknown = true) +public class MessageCalculateSum extends Message +{ + @Override + public Type getType() + { + return Type.CALC; + } +} diff --git a/src/test/java/de/juplo/kafka/MessageTest.java b/src/test/java/de/juplo/kafka/MessageTest.java new file mode 100644 index 00000000..82116f43 --- /dev/null +++ b/src/test/java/de/juplo/kafka/MessageTest.java @@ -0,0 +1,29 @@ +package de.juplo.kafka; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + + +public class MessageTest +{ + ObjectMapper mapper = new ObjectMapper(); + + @Test + @DisplayName("Deserialize a MessageAddNumber message") + public void testDeserializeMessageAddNumber() + { + Assertions.assertDoesNotThrow(() -> mapper.readValue("{\"next\":42}", MessageAddNumber.class)); + Assertions.assertDoesNotThrow(() -> mapper.readValue("{\"number\":666,\"next\":42}", MessageAddNumber.class)); + } + + @Test + @DisplayName("Deserialize a MessageCalculateSum message") + public void testDeserializeMessageCalculateSum() throws JsonProcessingException + { + Assertions.assertDoesNotThrow(() -> mapper.readValue("{}", MessageCalculateSum.class)); + Assertions.assertDoesNotThrow(() -> mapper.readValue("{\"number\":666}", MessageCalculateSum.class)); + } +} -- 2.20.1