From 917ddc7b598e1c7e361dbba8192e304962367952 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 4 Nov 2022 12:59:20 +0100 Subject: [PATCH] SimpleConsumer auf JSON-Nachrichten umgestellt --- src/main/java/de/juplo/kafka/Message.java | 9 ++++++ .../java/de/juplo/kafka/MessageAddNumber.java | 19 ++++++++++++ .../de/juplo/kafka/MessageCalculateSum.java | 16 ++++++++++ .../java/de/juplo/kafka/SimpleConsumer.java | 6 ++-- src/main/resources/application.yml | 5 +++- src/test/java/de/juplo/kafka/MessageTest.java | 29 +++++++++++++++++++ 6 files changed, 80 insertions(+), 4 deletions(-) create mode 100644 src/main/java/de/juplo/kafka/Message.java create mode 100644 src/main/java/de/juplo/kafka/MessageAddNumber.java create mode 100644 src/main/java/de/juplo/kafka/MessageCalculateSum.java create mode 100644 src/test/java/de/juplo/kafka/MessageTest.java diff --git a/src/main/java/de/juplo/kafka/Message.java b/src/main/java/de/juplo/kafka/Message.java new file mode 100644 index 0000000..e4999b7 --- /dev/null +++ b/src/main/java/de/juplo/kafka/Message.java @@ -0,0 +1,9 @@ +package de.juplo.kafka; + + +public abstract class Message +{ + public enum Type {ADD, CALC} + + public abstract Type getType(); +} diff --git a/src/main/java/de/juplo/kafka/MessageAddNumber.java b/src/main/java/de/juplo/kafka/MessageAddNumber.java new file mode 100644 index 0000000..c024b65 --- /dev/null +++ b/src/main/java/de/juplo/kafka/MessageAddNumber.java @@ -0,0 +1,19 @@ +package de.juplo.kafka; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.Data; + + +@Data +@JsonIgnoreProperties(ignoreUnknown = true) +public class MessageAddNumber extends Message +{ + private Integer next; + + + @Override + public Type getType() + { + return Type.ADD; + } +} diff --git a/src/main/java/de/juplo/kafka/MessageCalculateSum.java b/src/main/java/de/juplo/kafka/MessageCalculateSum.java new file mode 100644 index 0000000..afc5a39 --- /dev/null +++ b/src/main/java/de/juplo/kafka/MessageCalculateSum.java @@ -0,0 +1,16 @@ +package de.juplo.kafka; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.Data; + + +@Data +@JsonIgnoreProperties(ignoreUnknown = true) +public class MessageCalculateSum extends Message +{ + @Override + public Type getType() + { + return Type.CALC; + } +} diff --git a/src/main/java/de/juplo/kafka/SimpleConsumer.java b/src/main/java/de/juplo/kafka/SimpleConsumer.java index 0e37686..7f94371 100644 --- a/src/main/java/de/juplo/kafka/SimpleConsumer.java +++ b/src/main/java/de/juplo/kafka/SimpleConsumer.java @@ -17,7 +17,7 @@ public class SimpleConsumer implements Runnable { private final String id; private final String topic; - private final Consumer consumer; + private final Consumer consumer; private long consumed = 0; @@ -32,11 +32,11 @@ public class SimpleConsumer implements Runnable while (true) { - ConsumerRecords records = + ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); log.info("{} - Received {} messages", id, records.count()); - for (ConsumerRecord record : records) + for (ConsumerRecord record : records) { consumed++; log.info( diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index d524e5f..07d0625 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -30,10 +30,13 @@ spring: auto-offset-reset: earliest auto-commit-interval: 5s key-deserializer: org.apache.kafka.common.serialization.StringDeserializer - value-deserializer: org.apache.kafka.common.serialization.StringDeserializer + value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer properties: partition.assignment.strategy: org.apache.kafka.clients.consumer.StickyAssignor metadata.max.age.ms: 1000 + spring.json.type.mapping: > + ADD:de.juplo.kafka.MessageAddNumber, + CALC:de.juplo.kafka.MessageCalculateSum logging: level: root: INFO diff --git a/src/test/java/de/juplo/kafka/MessageTest.java b/src/test/java/de/juplo/kafka/MessageTest.java new file mode 100644 index 0000000..82116f4 --- /dev/null +++ b/src/test/java/de/juplo/kafka/MessageTest.java @@ -0,0 +1,29 @@ +package de.juplo.kafka; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + + +public class MessageTest +{ + ObjectMapper mapper = new ObjectMapper(); + + @Test + @DisplayName("Deserialize a MessageAddNumber message") + public void testDeserializeMessageAddNumber() + { + Assertions.assertDoesNotThrow(() -> mapper.readValue("{\"next\":42}", MessageAddNumber.class)); + Assertions.assertDoesNotThrow(() -> mapper.readValue("{\"number\":666,\"next\":42}", MessageAddNumber.class)); + } + + @Test + @DisplayName("Deserialize a MessageCalculateSum message") + public void testDeserializeMessageCalculateSum() throws JsonProcessingException + { + Assertions.assertDoesNotThrow(() -> mapper.readValue("{}", MessageCalculateSum.class)); + Assertions.assertDoesNotThrow(() -> mapper.readValue("{\"number\":666}", MessageCalculateSum.class)); + } +} -- 2.20.1