]> juplo.de Git - demos/kafka/training/commitdiff
Umbau in die lauffähige Vorlage mit Code springkafka/spring-consumer--vorlage springkafka/spring-consumer--vorlage--2026-06-lvm--rebase-vollständig
authorKai Moritz <kai@juplo.de>
Wed, 10 Jun 2026 19:58:55 +0000 (21:58 +0200)
committerKai Moritz <kai.milan.moritz@googlemail.com>
Fri, 12 Jun 2026 19:06:14 +0000 (19:06 +0000)
README.sh [deleted file]
src/main/java/de/juplo/kafka/ExampleConsumer.java
src/main/java/de/juplo/kafka/Message.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/MessageAddNumber.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/MessageCalculateSum.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/MessageTest.java [new file with mode: 0644]

diff --git a/README.sh b/README.sh
deleted file mode 100755 (executable)
index b46e235..0000000
--- a/README.sh
+++ /dev/null
@@ -1,39 +0,0 @@
-#!/bin/bash
-
-IMAGE=juplo/spring-consumer:1.1-SNAPSHOT
-
-if [ "$1" = "cleanup" ]
-then
-  docker compose -f docker/docker-compose.yml down -t0 -v --remove-orphans
-  mvn clean
-  exit
-fi
-
-docker compose -f docker/docker-compose.yml up -d --remove-orphans kafka-1 kafka-2 kafka-3
-docker compose -f docker/docker-compose.yml rm -svf consumer
-
-if [[
-  $(docker image ls -q $IMAGE) == "" ||
-  "$1" = "build"
-]]
-then
-  mvn clean install || exit
-else
-  echo "Using image existing images:"
-  docker image ls $IMAGE
-fi
-
-docker compose -f docker/docker-compose.yml up --remove-orphans setup || exit 1
-
-
-docker compose -f docker/docker-compose.yml up -d producer
-docker compose -f docker/docker-compose.yml up -d consumer
-
-sleep 5
-docker compose -f docker/docker-compose.yml stop consumer
-
-docker compose -f docker/docker-compose.yml start consumer
-sleep 5
-
-docker compose -f docker/docker-compose.yml stop producer consumer
-docker compose -f docker/docker-compose.yml logs consumer
index 1f5a5706c875604947c467cd32611c4135c6a01e..8b411f7e82cadb080ccd2ee6401c813daef8b61a 100644 (file)
@@ -91,8 +91,24 @@ public class ExampleConsumer implements Runnable
   {
     consumed++;
     log.info("{} - partition={}-{}, offset={}: {}={}", id, topic, partition, offset, key, value);
+    // TODO: Für eine spätere Übung...
+    // switch (value.getType())
+    // {
+    //   case ADD  -> addNumber((MessageAddNumber)value);
+    //   case CALC -> calcSum((MessageCalculateSum)value);
+    //   default   -> log.error("{} - Ignoring message of unknown typ {}", id, value.getType());
+    // }
   }
 
+  private void addNumber(MessageAddNumber addNumber)
+  {
+    log.info("{} - Adding number {}", id, addNumber.getNext());
+  }
+
+  private void calcSum(MessageCalculateSum calculateSum)
+  {
+    log.info("{} - Calculating sum", id);
+  }
 
   public void shutdown() throws InterruptedException
   {
diff --git a/src/main/java/de/juplo/kafka/Message.java b/src/main/java/de/juplo/kafka/Message.java
new file mode 100644 (file)
index 0000000..e4999b7
--- /dev/null
@@ -0,0 +1,9 @@
+package de.juplo.kafka;
+
+
+public abstract class Message
+{
+  public enum Type {ADD, CALC}
+
+  public abstract Type getType();
+}
diff --git a/src/main/java/de/juplo/kafka/MessageAddNumber.java b/src/main/java/de/juplo/kafka/MessageAddNumber.java
new file mode 100644 (file)
index 0000000..c024b65
--- /dev/null
@@ -0,0 +1,19 @@
+package de.juplo.kafka;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import lombok.Data;
+
+
+@Data
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class MessageAddNumber extends Message
+{
+  private Integer next;
+
+
+  @Override
+  public Type getType()
+  {
+    return Type.ADD;
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/MessageCalculateSum.java b/src/main/java/de/juplo/kafka/MessageCalculateSum.java
new file mode 100644 (file)
index 0000000..afc5a39
--- /dev/null
@@ -0,0 +1,16 @@
+package de.juplo.kafka;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import lombok.Data;
+
+
+@Data
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class MessageCalculateSum extends Message
+{
+  @Override
+  public Type getType()
+  {
+    return Type.CALC;
+  }
+}
diff --git a/src/test/java/de/juplo/kafka/MessageTest.java b/src/test/java/de/juplo/kafka/MessageTest.java
new file mode 100644 (file)
index 0000000..82116f4
--- /dev/null
@@ -0,0 +1,29 @@
+package de.juplo.kafka;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+
+
+public class MessageTest
+{
+  ObjectMapper mapper = new ObjectMapper();
+
+  @Test
+  @DisplayName("Deserialize a MessageAddNumber message")
+  public void testDeserializeMessageAddNumber()
+  {
+    Assertions.assertDoesNotThrow(() -> mapper.readValue("{\"next\":42}", MessageAddNumber.class));
+    Assertions.assertDoesNotThrow(() -> mapper.readValue("{\"number\":666,\"next\":42}", MessageAddNumber.class));
+  }
+
+  @Test
+  @DisplayName("Deserialize a MessageCalculateSum message")
+  public void testDeserializeMessageCalculateSum() throws JsonProcessingException
+  {
+    Assertions.assertDoesNotThrow(() -> mapper.readValue("{}", MessageCalculateSum.class));
+    Assertions.assertDoesNotThrow(() -> mapper.readValue("{\"number\":666}", MessageCalculateSum.class));
+  }
+}