From fbd2fe7121eea8ce0c25d53dcb80b2565ccc8598 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Mon, 7 Nov 2022 21:44:40 +0100 Subject: [PATCH] Spring-Kafka Version des Simple-Consumer auf JSON-Nachrichten umgestellt --- README.sh | 27 ++++++++++------ docker-compose.yml | 32 +++++++++++++++++-- pom.xml | 2 +- .../juplo/kafka/ApplicationConfiguration.java | 2 +- src/main/java/de/juplo/kafka/Message.java | 9 ++++++ .../java/de/juplo/kafka/MessageAddNumber.java | 19 +++++++++++ .../de/juplo/kafka/MessageCalculateSum.java | 16 ++++++++++ .../java/de/juplo/kafka/SimpleConsumer.java | 8 ++--- src/main/resources/application.yml | 5 ++- src/test/java/de/juplo/kafka/MessageTest.java | 29 +++++++++++++++++ 10 files changed, 130 insertions(+), 19 deletions(-) create mode 100644 src/main/java/de/juplo/kafka/Message.java create mode 100644 src/main/java/de/juplo/kafka/MessageAddNumber.java create mode 100644 src/main/java/de/juplo/kafka/MessageCalculateSum.java create mode 100644 src/test/java/de/juplo/kafka/MessageTest.java diff --git a/README.sh b/README.sh index 53d1aa7..feaa66a 100755 --- a/README.sh +++ b/README.sh @@ -1,6 +1,6 @@ #!/bin/bash -IMAGE=juplo/spring-consumer:1.0-SNAPSHOT +IMAGE=juplo/spring-consumer-json:1.0-SNAPSHOT if [ "$1" = "cleanup" ] then @@ -18,6 +18,7 @@ if [[ "$1" = "build" ]] then + docker-compose rm -svf consumer-1 consumer-2 mvn clean install || exit else echo "Using image existing images:" @@ -25,12 +26,18 @@ else fi docker-compose up setup -docker-compose up -d producer - -mvn spring-boot:run & -sleep 10 -kill $(jobs -p) -mvn spring-boot:run & -sleep 10 -docker-compose stop producer -kill $(jobs -p) +docker-compose up -d producer consumer-1 consumer-2 + +while ! [[ $(http 0:8080/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer..."; sleep 1; done +while ! [[ $(http 0:8081/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for consumer-1..."; sleep 1; done +while ! [[ $(http 0:8082/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for consumer-2..."; sleep 1; done + +echo 6 | http -v :8080/peter +echo 77 | http -v :8080/klaus +echo 66 | http -v :8080/peter +echo 7 | http -v :8080/klaus + +docker-compose stop consumer-1 consumer-2 + +docker-compose logs consumer-1 +docker-compose logs consumer-2 diff --git a/docker-compose.yml b/docker-compose.yml index bd15793..f6dbef4 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -103,5 +103,33 @@ services: command: sleep infinity producer: - image: juplo/simple-producer:1.0-SNAPSHOT - command: producer + image: juplo/rest-producer-json:1.0-SNAPSHOT + ports: + - 8080:8080 + environment: + server.port: 8080 + producer.bootstrap-server: kafka:9092 + producer.client-id: producer + producer.topic: test + + consumer-1: + image: juplo/spring-consumer-json:1.0-SNAPSHOT + ports: + - 8081:8080 + environment: + server.port: 8080 + spring.kafka.bootstrap-servers: kafka:9092 + spring.kafka.client-id: consumer-1 + spring.kafka.consumer.group-id: my-group + simple.consumer.topic: test + + consumer-2: + image: juplo/spring-consumer-json:1.0-SNAPSHOT + ports: + - 8082:8080 + environment: + server.port: 8080 + spring.kafka.bootstrap-servers: kafka:9092 + spring.kafka.client-id: consumer-2 + spring.kafka.consumer.group-id: my-group + simple.consumer.topic: test diff --git a/pom.xml b/pom.xml index cdfb199..ebf6eb1 100644 --- a/pom.xml +++ b/pom.xml @@ -12,7 +12,7 @@ de.juplo.kafka - spring-consumer + spring-consumer-json 1.0-SNAPSHOT Spring Consumer Super Simple Consumer-Group, that is implemented as Spring-Boot application and configured by Spring Kafka diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 62d61a2..a8b3e1d 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -15,7 +15,7 @@ public class ApplicationConfiguration { @Bean public SimpleConsumer simpleConsumer( - Consumer kafkaConsumer, + Consumer kafkaConsumer, KafkaProperties kafkaProperties, ApplicationProperties applicationProperties) { diff --git a/src/main/java/de/juplo/kafka/Message.java b/src/main/java/de/juplo/kafka/Message.java new file mode 100644 index 0000000..e4999b7 --- /dev/null +++ b/src/main/java/de/juplo/kafka/Message.java @@ -0,0 +1,9 @@ +package de.juplo.kafka; + + +public abstract class Message +{ + public enum Type {ADD, CALC} + + public abstract Type getType(); +} diff --git a/src/main/java/de/juplo/kafka/MessageAddNumber.java b/src/main/java/de/juplo/kafka/MessageAddNumber.java new file mode 100644 index 0000000..c024b65 --- /dev/null +++ b/src/main/java/de/juplo/kafka/MessageAddNumber.java @@ -0,0 +1,19 @@ +package de.juplo.kafka; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.Data; + + +@Data +@JsonIgnoreProperties(ignoreUnknown = true) +public class MessageAddNumber extends Message +{ + private Integer next; + + + @Override + public Type getType() + { + return Type.ADD; + } +} diff --git a/src/main/java/de/juplo/kafka/MessageCalculateSum.java b/src/main/java/de/juplo/kafka/MessageCalculateSum.java new file mode 100644 index 0000000..afc5a39 --- /dev/null +++ b/src/main/java/de/juplo/kafka/MessageCalculateSum.java @@ -0,0 +1,16 @@ +package de.juplo.kafka; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.Data; + + +@Data +@JsonIgnoreProperties(ignoreUnknown = true) +public class MessageCalculateSum extends Message +{ + @Override + public Type getType() + { + return Type.CALC; + } +} diff --git a/src/main/java/de/juplo/kafka/SimpleConsumer.java b/src/main/java/de/juplo/kafka/SimpleConsumer.java index 1cf9b22..3b0202f 100644 --- a/src/main/java/de/juplo/kafka/SimpleConsumer.java +++ b/src/main/java/de/juplo/kafka/SimpleConsumer.java @@ -17,7 +17,7 @@ public class SimpleConsumer implements Runnable { private final String id; private final String topic; - private final Consumer consumer; + private final Consumer consumer; private long consumed = 0; @@ -32,11 +32,11 @@ public class SimpleConsumer implements Runnable while (true) { - ConsumerRecords records = + ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); log.info("{} - Received {} messages", id, records.count()); - for (ConsumerRecord record : records) + for (ConsumerRecord record : records) { handleRecord( record.topic(), @@ -69,7 +69,7 @@ public class SimpleConsumer implements Runnable Integer partition, Long offset, String key, - String value) + Message value) { consumed++; log.info("{} - {}: {}/{} - {}={}", id, offset, topic, partition, key, value); diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index d524e5f..07d0625 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -30,10 +30,13 @@ spring: auto-offset-reset: earliest auto-commit-interval: 5s key-deserializer: org.apache.kafka.common.serialization.StringDeserializer - value-deserializer: org.apache.kafka.common.serialization.StringDeserializer + value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer properties: partition.assignment.strategy: org.apache.kafka.clients.consumer.StickyAssignor metadata.max.age.ms: 1000 + spring.json.type.mapping: > + ADD:de.juplo.kafka.MessageAddNumber, + CALC:de.juplo.kafka.MessageCalculateSum logging: level: root: INFO diff --git a/src/test/java/de/juplo/kafka/MessageTest.java b/src/test/java/de/juplo/kafka/MessageTest.java new file mode 100644 index 0000000..82116f4 --- /dev/null +++ b/src/test/java/de/juplo/kafka/MessageTest.java @@ -0,0 +1,29 @@ +package de.juplo.kafka; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + + +public class MessageTest +{ + ObjectMapper mapper = new ObjectMapper(); + + @Test + @DisplayName("Deserialize a MessageAddNumber message") + public void testDeserializeMessageAddNumber() + { + Assertions.assertDoesNotThrow(() -> mapper.readValue("{\"next\":42}", MessageAddNumber.class)); + Assertions.assertDoesNotThrow(() -> mapper.readValue("{\"number\":666,\"next\":42}", MessageAddNumber.class)); + } + + @Test + @DisplayName("Deserialize a MessageCalculateSum message") + public void testDeserializeMessageCalculateSum() throws JsonProcessingException + { + Assertions.assertDoesNotThrow(() -> mapper.readValue("{}", MessageCalculateSum.class)); + Assertions.assertDoesNotThrow(() -> mapper.readValue("{\"number\":666}", MessageCalculateSum.class)); + } +} -- 2.20.1