`spring-consumer` empfängt Nachrichten aus geteilter Lib spring/spring-consumer--json--messages--generics4all
authorKai Moritz <kai@juplo.de>
Sun, 2 Feb 2025 15:01:17 +0000 (16:01 +0100)
committerKai Moritz <kai@juplo.de>
Mon, 24 Feb 2025 20:45:21 +0000 (21:45 +0100)
README.sh
build.gradle
docker/docker-compose.yml
pom.xml
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/ExampleConsumer.java
src/main/java/de/juplo/kafka/Message.java [deleted file]
src/main/java/de/juplo/kafka/MessageAddNumber.java [deleted file]
src/main/java/de/juplo/kafka/MessageCalculateSum.java [deleted file]
src/test/java/de/juplo/kafka/MessageTest.java [deleted file]

index 7152ec9..01f39f9 100755 (executable)
--- a/README.sh
+++ b/README.sh
@@ -1,6 +1,6 @@
 #!/bin/bash
 
-IMAGE=juplo/spring-consumer:1.1-json-SNAPSHOT
+IMAGE=juplo/spring-consumer:1.1-json-messages-SNAPSHOT
 
 if [ "$1" = "cleanup" ]
 then
index 3ddca4b..e04996d 100644 (file)
@@ -8,7 +8,7 @@ plugins {
 }
 
 group = 'de.juplo.kafka'
-version = '1.1-json-SNAPSHOT'
+version = '1.1-json-messages-SNAPSHOT'
 
 java {
        toolchain {
@@ -24,6 +24,7 @@ configurations {
 
 repositories {
        mavenCentral()
+       mavenLocal()
 }
 
 dependencies {
@@ -31,6 +32,7 @@ dependencies {
        implementation 'org.springframework.boot:spring-boot-starter-actuator'
        implementation 'org.springframework.boot:spring-boot-starter-validation'
        implementation 'org.springframework.boot:spring-boot-starter-web'
+       implementation 'de.juplo.messages:sumup-messages:1.0-SNAPSHOT'
        compileOnly 'org.projectlombok:lombok'
        developmentOnly 'org.springframework.boot:spring-boot-devtools'
        annotationProcessor 'org.springframework.boot:spring-boot-configuration-processor'
index 9fb105f..896f4a6 100644 (file)
@@ -136,7 +136,7 @@ services:
       - kafka-3
 
   producer:
-    image: juplo/spring-producer:1.0-json-SNAPSHOT
+    image: juplo/spring-producer:1.0-json-messages-SNAPSHOT
     environment:
       juplo.bootstrap-server: kafka:9092
       juplo.client-id: producer
@@ -145,21 +145,21 @@ services:
       juplo.producer.throttle-ms: 100
 
   consumer:
-    image: juplo/spring-consumer:1.1-json-SNAPSHOT
+    image: juplo/spring-consumer:1.1-json-messages-SNAPSHOT
     environment:
       juplo.bootstrap-server: kafka:9092
       juplo.client-id: consumer
       juplo.consumer.topic: test
 
   peter:
-    image: juplo/spring-consumer:1.1-json-SNAPSHOT
+    image: juplo/spring-consumer:1.1-json-messages-SNAPSHOT
     environment:
       juplo.bootstrap-server: kafka:9092
       juplo.client-id: peter
       juplo.consumer.topic: test
 
   ute:
-    image: juplo/spring-consumer:1.1-json-SNAPSHOT
+    image: juplo/spring-consumer:1.1-json-messages-SNAPSHOT
     environment:
       juplo.bootstrap-server: kafka:9092
       juplo.client-id: ute
diff --git a/pom.xml b/pom.xml
index 8dd99a5..add6e70 100644 (file)
--- a/pom.xml
+++ b/pom.xml
   <artifactId>spring-consumer</artifactId>
   <name>Spring Consumer</name>
   <description>Super Simple Consumer-Group, that is implemented as Spring-Boot application and configured by Spring Kafka</description>
-  <version>1.1-json-SNAPSHOT</version>
+  <version>1.1-json-messages-SNAPSHOT</version>
 
   <properties>
     <java.version>21</java.version>
+    <sumup-messages.version>1.0-SNAPSHOT</sumup-messages.version>
   </properties>
 
   <dependencies>
       <groupId>org.springframework.kafka</groupId>
       <artifactId>spring-kafka</artifactId>
     </dependency>
+    <dependency>
+      <groupId>de.juplo.messages</groupId>
+      <artifactId>sumup-messages</artifactId>
+      <version>${sumup-messages.version}</version>
+    </dependency>
     <dependency>
       <groupId>org.projectlombok</groupId>
       <artifactId>lombok</artifactId>
index 7aac916..1d8c916 100644 (file)
@@ -1,5 +1,6 @@
 package de.juplo.kafka;
 
+import de.juplo.messages.Message;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.StickyAssignor;
@@ -50,7 +51,7 @@ public class ApplicationConfiguration
     props.put("partition.assignment.strategy", StickyAssignor.class.getName());
     props.put("key.deserializer", StringDeserializer.class.getName());
     props.put("value.deserializer", JsonDeserializer.class.getName());
-    props.put("spring.json.type.mapping", "ADD:de.juplo.kafka.MessageAddNumber,CALC:de.juplo.kafka.MessageCalculateSum");
+    props.put("spring.json.trusted.packages", "de.juplo.messages");
 
     return new KafkaConsumer<>(props);
   }
index d647aa9..b61d8b2 100644 (file)
@@ -1,5 +1,8 @@
 package de.juplo.kafka;
 
+import de.juplo.messages.Message;
+import de.juplo.messages.Add;
+import de.juplo.messages.Calculate;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -95,20 +98,20 @@ public class ExampleConsumer<K, V extends Message> implements Runnable
     log.info("{} - partition={}-{}, offset={}: {}={}", id, topic, partition, offset, key, value);
     switch (value.getType())
     {
-      case ADD  -> addNumber((MessageAddNumber)value);
-      case CALC -> calcSum((MessageCalculateSum)value);
+      case ADD  -> addNumber((Add)value);
+      case CALC -> calcSum((Calculate)value);
       default   -> log.error("{} - Ignoring message of unknown typ {}", id, value.getType());
     }
   }
 
-  private void addNumber(MessageAddNumber addNumber)
+  private void addNumber(Add add)
   {
-    log.info("{} - Adding number {}", id, addNumber.getNext());
+    log.info("{} - Adding next summand {} for number {}", id, add.getNext(), add.getNumber());
   }
 
-  private void calcSum(MessageCalculateSum calculateSum)
+  private void calcSum(Calculate   calculate)
   {
-    log.info("{} - Calculating sum", id);
+    log.info("{} - Calculating sum for number {}", id, calculate.getNumber());
   }
 
   public void shutdown() throws InterruptedException
diff --git a/src/main/java/de/juplo/kafka/Message.java b/src/main/java/de/juplo/kafka/Message.java
deleted file mode 100644 (file)
index e4999b7..0000000
+++ /dev/null
@@ -1,9 +0,0 @@
-package de.juplo.kafka;
-
-
-public abstract class Message
-{
-  public enum Type {ADD, CALC}
-
-  public abstract Type getType();
-}
diff --git a/src/main/java/de/juplo/kafka/MessageAddNumber.java b/src/main/java/de/juplo/kafka/MessageAddNumber.java
deleted file mode 100644 (file)
index c024b65..0000000
+++ /dev/null
@@ -1,19 +0,0 @@
-package de.juplo.kafka;
-
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import lombok.Data;
-
-
-@Data
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class MessageAddNumber extends Message
-{
-  private Integer next;
-
-
-  @Override
-  public Type getType()
-  {
-    return Type.ADD;
-  }
-}
diff --git a/src/main/java/de/juplo/kafka/MessageCalculateSum.java b/src/main/java/de/juplo/kafka/MessageCalculateSum.java
deleted file mode 100644 (file)
index afc5a39..0000000
+++ /dev/null
@@ -1,16 +0,0 @@
-package de.juplo.kafka;
-
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import lombok.Data;
-
-
-@Data
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class MessageCalculateSum extends Message
-{
-  @Override
-  public Type getType()
-  {
-    return Type.CALC;
-  }
-}
diff --git a/src/test/java/de/juplo/kafka/MessageTest.java b/src/test/java/de/juplo/kafka/MessageTest.java
deleted file mode 100644 (file)
index 82116f4..0000000
+++ /dev/null
@@ -1,29 +0,0 @@
-package de.juplo.kafka;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.DisplayName;
-import org.junit.jupiter.api.Test;
-
-
-public class MessageTest
-{
-  ObjectMapper mapper = new ObjectMapper();
-
-  @Test
-  @DisplayName("Deserialize a MessageAddNumber message")
-  public void testDeserializeMessageAddNumber()
-  {
-    Assertions.assertDoesNotThrow(() -> mapper.readValue("{\"next\":42}", MessageAddNumber.class));
-    Assertions.assertDoesNotThrow(() -> mapper.readValue("{\"number\":666,\"next\":42}", MessageAddNumber.class));
-  }
-
-  @Test
-  @DisplayName("Deserialize a MessageCalculateSum message")
-  public void testDeserializeMessageCalculateSum() throws JsonProcessingException
-  {
-    Assertions.assertDoesNotThrow(() -> mapper.readValue("{}", MessageCalculateSum.class));
-    Assertions.assertDoesNotThrow(() -> mapper.readValue("{\"number\":666}", MessageCalculateSum.class));
-  }
-}