From: Kai Moritz Date: Mon, 11 Nov 2024 20:24:25 +0000 (+0100) Subject: Version des `spring-consumer`, der JSON-Nachrichten erwartet X-Git-Tag: consumer/spring-consumer--json--2024-11-13--si X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=refs%2Fheads%2Fconsumer%2Fspring-consumer--json;p=demos%2Fkafka%2Ftraining Version des `spring-consumer`, der JSON-Nachrichten erwartet --- diff --git a/README.sh b/README.sh index b46e235..7152ec9 100755 --- a/README.sh +++ b/README.sh @@ -1,6 +1,6 @@ #!/bin/bash -IMAGE=juplo/spring-consumer:1.1-SNAPSHOT +IMAGE=juplo/spring-consumer:1.1-json-SNAPSHOT if [ "$1" = "cleanup" ] then diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 6bd2766..2e40b79 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -190,7 +190,7 @@ services: - kafka-3 producer: - image: juplo/spring-producer:1.0-SNAPSHOT + image: juplo/spring-producer:1.0-json-SNAPSHOT environment: juplo.bootstrap-server: kafka:9092 juplo.client-id: producer @@ -199,7 +199,7 @@ services: juplo.producer.throttle-ms: 100 consumer: - image: juplo/spring-consumer:1.1-SNAPSHOT + image: juplo/spring-consumer:1.1-json-SNAPSHOT environment: juplo.bootstrap-server: kafka:9092 juplo.client-id: consumer diff --git a/pom.xml b/pom.xml index 98a0a36..30cda01 100644 --- a/pom.xml +++ b/pom.xml @@ -15,7 +15,7 @@ spring-consumer Spring Consumer Super Simple Consumer-Group, that is implemented as Spring-Boot application and configured by Spring Kafka - 1.1-SNAPSHOT + 1.1-json-SNAPSHOT 21 @@ -40,8 +40,8 @@ spring-boot-starter-validation - org.apache.kafka - kafka-clients + org.springframework.kafka + spring-kafka org.projectlombok @@ -52,11 +52,6 @@ spring-boot-starter-test test - - org.springframework.kafka - spring-kafka - test - org.springframework.kafka spring-kafka-test diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index a4856a6..3f7422c 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -4,6 +4,7 @@ import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.StickyAssignor; import org.apache.kafka.common.serialization.StringDeserializer; +import org.springframework.kafka.support.serializer.JsonDeserializer; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; @@ -18,7 +19,7 @@ public class ApplicationConfiguration { @Bean public ExampleConsumer exampleConsumer( - Consumer kafkaConsumer, + Consumer kafkaConsumer, ApplicationProperties properties, ConfigurableApplicationContext applicationContext) { @@ -31,7 +32,7 @@ public class ApplicationConfiguration } @Bean(destroyMethod = "") - public KafkaConsumer kafkaConsumer(ApplicationProperties properties) + public KafkaConsumer kafkaConsumer(ApplicationProperties properties) { Properties props = new Properties(); props.put("bootstrap.servers", properties.getBootstrapServer()); @@ -48,7 +49,8 @@ public class ApplicationConfiguration props.put("metadata.maxage.ms", 5000); // 5 Sekunden props.put("partition.assignment.strategy", StickyAssignor.class.getName()); props.put("key.deserializer", StringDeserializer.class.getName()); - props.put("value.deserializer", StringDeserializer.class.getName()); + props.put("value.deserializer", JsonDeserializer.class.getName()); + props.put("spring.json.type.mapping", "ADD:de.juplo.kafka.MessageAddNumber,CALC:de.juplo.kafka.MessageCalculateSum"); return new KafkaConsumer<>(props); } diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index f832b45..654ef37 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -15,7 +15,7 @@ public class ExampleConsumer implements Runnable { private final String id; private final String topic; - private final Consumer consumer; + private final Consumer consumer; private final Thread workerThread; private final Runnable closeCallback; @@ -26,7 +26,7 @@ public class ExampleConsumer implements Runnable public ExampleConsumer( String clientId, String topic, - Consumer consumer, + Consumer consumer, Runnable closeCallback) { this.id = clientId; @@ -51,11 +51,11 @@ public class ExampleConsumer implements Runnable while (running) { - ConsumerRecords records = + ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); log.info("{} - Received {} messages", id, records.count()); - for (ConsumerRecord record : records) + for (ConsumerRecord record : records) { handleRecord( record.topic(), @@ -90,7 +90,7 @@ public class ExampleConsumer implements Runnable Integer partition, Long offset, String key, - String value) + Message value) { consumed++; log.info("{} - {}: {}/{} - {}={}", id, offset, topic, partition, key, value); diff --git a/src/main/java/de/juplo/kafka/Message.java b/src/main/java/de/juplo/kafka/Message.java new file mode 100644 index 0000000..e4999b7 --- /dev/null +++ b/src/main/java/de/juplo/kafka/Message.java @@ -0,0 +1,9 @@ +package de.juplo.kafka; + + +public abstract class Message +{ + public enum Type {ADD, CALC} + + public abstract Type getType(); +} diff --git a/src/main/java/de/juplo/kafka/MessageAddNumber.java b/src/main/java/de/juplo/kafka/MessageAddNumber.java new file mode 100644 index 0000000..c024b65 --- /dev/null +++ b/src/main/java/de/juplo/kafka/MessageAddNumber.java @@ -0,0 +1,19 @@ +package de.juplo.kafka; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.Data; + + +@Data +@JsonIgnoreProperties(ignoreUnknown = true) +public class MessageAddNumber extends Message +{ + private Integer next; + + + @Override + public Type getType() + { + return Type.ADD; + } +} diff --git a/src/main/java/de/juplo/kafka/MessageCalculateSum.java b/src/main/java/de/juplo/kafka/MessageCalculateSum.java new file mode 100644 index 0000000..afc5a39 --- /dev/null +++ b/src/main/java/de/juplo/kafka/MessageCalculateSum.java @@ -0,0 +1,16 @@ +package de.juplo.kafka; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.Data; + + +@Data +@JsonIgnoreProperties(ignoreUnknown = true) +public class MessageCalculateSum extends Message +{ + @Override + public Type getType() + { + return Type.CALC; + } +} diff --git a/src/test/java/de/juplo/kafka/MessageTest.java b/src/test/java/de/juplo/kafka/MessageTest.java new file mode 100644 index 0000000..82116f4 --- /dev/null +++ b/src/test/java/de/juplo/kafka/MessageTest.java @@ -0,0 +1,29 @@ +package de.juplo.kafka; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + + +public class MessageTest +{ + ObjectMapper mapper = new ObjectMapper(); + + @Test + @DisplayName("Deserialize a MessageAddNumber message") + public void testDeserializeMessageAddNumber() + { + Assertions.assertDoesNotThrow(() -> mapper.readValue("{\"next\":42}", MessageAddNumber.class)); + Assertions.assertDoesNotThrow(() -> mapper.readValue("{\"number\":666,\"next\":42}", MessageAddNumber.class)); + } + + @Test + @DisplayName("Deserialize a MessageCalculateSum message") + public void testDeserializeMessageCalculateSum() throws JsonProcessingException + { + Assertions.assertDoesNotThrow(() -> mapper.readValue("{}", MessageCalculateSum.class)); + Assertions.assertDoesNotThrow(() -> mapper.readValue("{\"number\":666}", MessageCalculateSum.class)); + } +}