From fc1ead76d70df96b6df3dd3d756354a905ef227d Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 2 Feb 2025 16:01:17 +0100 Subject: [PATCH] =?utf8?q?`spring-consumer`=20empf=C3=A4ngt=20Nachrichten?= =?utf8?q?=20aus=20geteilter=20Lib?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- README.sh | 2 +- build.gradle | 4 ++- docker/docker-compose.yml | 8 ++--- pom.xml | 8 ++++- .../juplo/kafka/ApplicationConfiguration.java | 3 +- .../java/de/juplo/kafka/ExampleConsumer.java | 15 ++++++---- src/main/java/de/juplo/kafka/Message.java | 9 ------ .../java/de/juplo/kafka/MessageAddNumber.java | 19 ------------ .../de/juplo/kafka/MessageCalculateSum.java | 16 ---------- src/test/java/de/juplo/kafka/MessageTest.java | 29 ------------------- 10 files changed, 26 insertions(+), 87 deletions(-) delete mode 100644 src/main/java/de/juplo/kafka/Message.java delete mode 100644 src/main/java/de/juplo/kafka/MessageAddNumber.java delete mode 100644 src/main/java/de/juplo/kafka/MessageCalculateSum.java delete mode 100644 src/test/java/de/juplo/kafka/MessageTest.java diff --git a/README.sh b/README.sh index 7152ec9d..01f39f9c 100755 --- a/README.sh +++ b/README.sh @@ -1,6 +1,6 @@ #!/bin/bash -IMAGE=juplo/spring-consumer:1.1-json-SNAPSHOT +IMAGE=juplo/spring-consumer:1.1-json-messages-SNAPSHOT if [ "$1" = "cleanup" ] then diff --git a/build.gradle b/build.gradle index 3ddca4bb..e04996db 100644 --- a/build.gradle +++ b/build.gradle @@ -8,7 +8,7 @@ plugins { } group = 'de.juplo.kafka' -version = '1.1-json-SNAPSHOT' +version = '1.1-json-messages-SNAPSHOT' java { toolchain { @@ -24,6 +24,7 @@ configurations { repositories { mavenCentral() + mavenLocal() } dependencies { @@ -31,6 +32,7 @@ dependencies { implementation 'org.springframework.boot:spring-boot-starter-actuator' implementation 'org.springframework.boot:spring-boot-starter-validation' implementation 'org.springframework.boot:spring-boot-starter-web' + implementation 'de.juplo.messages:sumup-messages:1.0-SNAPSHOT' compileOnly 'org.projectlombok:lombok' developmentOnly 'org.springframework.boot:spring-boot-devtools' annotationProcessor 'org.springframework.boot:spring-boot-configuration-processor' diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 9fb105f3..896f4a6c 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -136,7 +136,7 @@ services: - kafka-3 producer: - image: juplo/spring-producer:1.0-json-SNAPSHOT + image: juplo/spring-producer:1.0-json-messages-SNAPSHOT environment: juplo.bootstrap-server: kafka:9092 juplo.client-id: producer @@ -145,21 +145,21 @@ services: juplo.producer.throttle-ms: 100 consumer: - image: juplo/spring-consumer:1.1-json-SNAPSHOT + image: juplo/spring-consumer:1.1-json-messages-SNAPSHOT environment: juplo.bootstrap-server: kafka:9092 juplo.client-id: consumer juplo.consumer.topic: test peter: - image: juplo/spring-consumer:1.1-json-SNAPSHOT + image: juplo/spring-consumer:1.1-json-messages-SNAPSHOT environment: juplo.bootstrap-server: kafka:9092 juplo.client-id: peter juplo.consumer.topic: test ute: - image: juplo/spring-consumer:1.1-json-SNAPSHOT + image: juplo/spring-consumer:1.1-json-messages-SNAPSHOT environment: juplo.bootstrap-server: kafka:9092 juplo.client-id: ute diff --git a/pom.xml b/pom.xml index 8dd99a57..add6e70d 100644 --- a/pom.xml +++ b/pom.xml @@ -15,10 +15,11 @@ spring-consumer Spring Consumer Super Simple Consumer-Group, that is implemented as Spring-Boot application and configured by Spring Kafka - 1.1-json-SNAPSHOT + 1.1-json-messages-SNAPSHOT 21 + 1.0-SNAPSHOT @@ -43,6 +44,11 @@ org.springframework.kafka spring-kafka + + de.juplo.messages + sumup-messages + ${sumup-messages.version} + org.projectlombok lombok diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 7aac916d..1d8c9162 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -1,5 +1,6 @@ package de.juplo.kafka; +import de.juplo.messages.Message; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.StickyAssignor; @@ -50,7 +51,7 @@ public class ApplicationConfiguration props.put("partition.assignment.strategy", StickyAssignor.class.getName()); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", JsonDeserializer.class.getName()); - props.put("spring.json.type.mapping", "ADD:de.juplo.kafka.MessageAddNumber,CALC:de.juplo.kafka.MessageCalculateSum"); + props.put("spring.json.trusted.packages", "de.juplo.messages"); return new KafkaConsumer<>(props); } diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index d647aa97..b61d8b29 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -1,5 +1,8 @@ package de.juplo.kafka; +import de.juplo.messages.Message; +import de.juplo.messages.Add; +import de.juplo.messages.Calculate; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -95,20 +98,20 @@ public class ExampleConsumer implements Runnable log.info("{} - partition={}-{}, offset={}: {}={}", id, topic, partition, offset, key, value); switch (value.getType()) { - case ADD -> addNumber((MessageAddNumber)value); - case CALC -> calcSum((MessageCalculateSum)value); + case ADD -> addNumber((Add)value); + case CALC -> calcSum((Calculate)value); default -> log.error("{} - Ignoring message of unknown typ {}", id, value.getType()); } } - private void addNumber(MessageAddNumber addNumber) + private void addNumber(Add add) { - log.info("{} - Adding number {}", id, addNumber.getNext()); + log.info("{} - Adding next summand {} for number {}", id, add.getNext(), add.getNumber()); } - private void calcSum(MessageCalculateSum calculateSum) + private void calcSum(Calculate calculate) { - log.info("{} - Calculating sum", id); + log.info("{} - Calculating sum for number {}", id, calculate.getNumber()); } public void shutdown() throws InterruptedException diff --git a/src/main/java/de/juplo/kafka/Message.java b/src/main/java/de/juplo/kafka/Message.java deleted file mode 100644 index e4999b77..00000000 --- a/src/main/java/de/juplo/kafka/Message.java +++ /dev/null @@ -1,9 +0,0 @@ -package de.juplo.kafka; - - -public abstract class Message -{ - public enum Type {ADD, CALC} - - public abstract Type getType(); -} diff --git a/src/main/java/de/juplo/kafka/MessageAddNumber.java b/src/main/java/de/juplo/kafka/MessageAddNumber.java deleted file mode 100644 index c024b65d..00000000 --- a/src/main/java/de/juplo/kafka/MessageAddNumber.java +++ /dev/null @@ -1,19 +0,0 @@ -package de.juplo.kafka; - -import com.fasterxml.jackson.annotation.JsonIgnoreProperties; -import lombok.Data; - - -@Data -@JsonIgnoreProperties(ignoreUnknown = true) -public class MessageAddNumber extends Message -{ - private Integer next; - - - @Override - public Type getType() - { - return Type.ADD; - } -} diff --git a/src/main/java/de/juplo/kafka/MessageCalculateSum.java b/src/main/java/de/juplo/kafka/MessageCalculateSum.java deleted file mode 100644 index afc5a393..00000000 --- a/src/main/java/de/juplo/kafka/MessageCalculateSum.java +++ /dev/null @@ -1,16 +0,0 @@ -package de.juplo.kafka; - -import com.fasterxml.jackson.annotation.JsonIgnoreProperties; -import lombok.Data; - - -@Data -@JsonIgnoreProperties(ignoreUnknown = true) -public class MessageCalculateSum extends Message -{ - @Override - public Type getType() - { - return Type.CALC; - } -} diff --git a/src/test/java/de/juplo/kafka/MessageTest.java b/src/test/java/de/juplo/kafka/MessageTest.java deleted file mode 100644 index 82116f43..00000000 --- a/src/test/java/de/juplo/kafka/MessageTest.java +++ /dev/null @@ -1,29 +0,0 @@ -package de.juplo.kafka; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Test; - - -public class MessageTest -{ - ObjectMapper mapper = new ObjectMapper(); - - @Test - @DisplayName("Deserialize a MessageAddNumber message") - public void testDeserializeMessageAddNumber() - { - Assertions.assertDoesNotThrow(() -> mapper.readValue("{\"next\":42}", MessageAddNumber.class)); - Assertions.assertDoesNotThrow(() -> mapper.readValue("{\"number\":666,\"next\":42}", MessageAddNumber.class)); - } - - @Test - @DisplayName("Deserialize a MessageCalculateSum message") - public void testDeserializeMessageCalculateSum() throws JsonProcessingException - { - Assertions.assertDoesNotThrow(() -> mapper.readValue("{}", MessageCalculateSum.class)); - Assertions.assertDoesNotThrow(() -> mapper.readValue("{\"number\":666}", MessageCalculateSum.class)); - } -} -- 2.20.1