From 9fe455e4ccd0afdad0a9f12c009aa42151c2d098 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 28 May 2022 18:27:45 +0200 Subject: [PATCH] Springify: Der Consumer kann unterschiedliche Nachrichten-Typen empfangen MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Neben `ClientMessage` gibt es jetzt auch noch den Nachrichten-Typ `Greeting`. * `ClientMessage` ist auf den Bezeichner `message` gemapped. * `Greeting` ist auf den Bezeichner `greeting` gemapped. * Für die beiden bekannten Nachrichten-Typen sind via `@KafkaHandler` separate Handler konfiguriert. * Unbekannte Nachrichten (z.B. `FooMessage`) landen zusammen mit anderen Fehlern in dem DLQ-Topic. --- .../juplo/kafka/ApplicationConfiguration.java | 21 +++++++--- .../kafka/ApplicationHealthIndicator.java | 2 +- .../java/de/juplo/kafka/EndlessConsumer.java | 38 +++++++++---------- src/main/java/de/juplo/kafka/Greeting.java | 13 +++++++ src/main/resources/application.yml | 2 +- .../java/de/juplo/kafka/ApplicationTests.java | 2 - 6 files changed, 50 insertions(+), 28 deletions(-) create mode 100644 src/main/java/de/juplo/kafka/Greeting.java diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 4923b09..51e7979 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -1,5 +1,6 @@ package de.juplo.kafka; +import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.ByteArraySerializer; @@ -21,14 +22,24 @@ import java.util.function.Consumer; @Configuration @EnableConfigurationProperties({ KafkaProperties.class, ApplicationProperties.class }) +@Slf4j public class ApplicationConfiguration { @Bean - public Consumer> consumer() + public Consumer messageHandler() { - return (record) -> + return (message) -> { - // Handle record + log.info("Received ClientMessage: {}", message); + }; + } + + @Bean + public Consumer greetingsHandler() + { + return (greeting) -> + { + log.info("Received Greeting: {}", greeting); }; } @@ -39,7 +50,7 @@ public class ApplicationConfiguration new StringSerializer(), new DelegatingByTypeSerializer(Map.of( byte[].class, new ByteArraySerializer(), - ClientMessage.class, new JsonSerializer<>()))); + Object.class, new JsonSerializer<>()))); } @Bean @@ -66,7 +77,7 @@ public class ApplicationConfiguration } @Bean(destroyMethod = "close") - public org.apache.kafka.clients.consumer.Consumer kafkaConsumer(ConsumerFactory factory) + public org.apache.kafka.clients.consumer.Consumer kafkaConsumer(ConsumerFactory factory) { return factory.createConsumer(); } diff --git a/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java b/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java index 742550e..bb18a19 100644 --- a/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java +++ b/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java @@ -10,7 +10,7 @@ import org.springframework.stereotype.Component; @RequiredArgsConstructor public class ApplicationHealthIndicator implements HealthIndicator { - private final EndlessConsumer consumer; + private final EndlessConsumer consumer; @Override diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index 04a0a3a..0a864da 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -6,6 +6,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.annotation.KafkaHandler; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.config.KafkaListenerEndpointRegistry; import org.springframework.kafka.listener.ConsumerAwareRebalanceListener; @@ -20,14 +21,21 @@ import java.util.function.Consumer; @Component @Slf4j @RequiredArgsConstructor -public class EndlessConsumer implements ConsumerAwareRebalanceListener +@KafkaListener( + id = "${spring.kafka.consumer.client-id}", + idIsGroup = false, + topics = "${consumer.topic}", + autoStartup = "false") +public class EndlessConsumer implements ConsumerAwareRebalanceListener { @Autowired private KafkaListenerEndpointRegistry registry; @Value("${spring.kafka.consumer.client-id}") String id; @Autowired - Consumer> handler; + Consumer messageHandler; + @Autowired + Consumer greetingsHandler; private long consumed = 0; @@ -81,25 +89,17 @@ public class EndlessConsumer implements ConsumerAwareRebalanceListener } - @KafkaListener( - id = "${spring.kafka.consumer.client-id}", - idIsGroup = false, - topics = "${consumer.topic}", - autoStartup = "false") - public void receive(ConsumerRecord record) + @KafkaHandler + public void receiveGreeting(Greeting greeting) { - log.info( - "{} - {}: {}/{} - {}={}", - id, - record.offset(), - record.topic(), - record.partition(), - record.key(), - record.value() - ); - - handler.accept(record); + greetingsHandler.accept(greeting); + consumed++; + } + @KafkaHandler + public void receiveMessage(ClientMessage message) + { + messageHandler.accept(message); consumed++; } diff --git a/src/main/java/de/juplo/kafka/Greeting.java b/src/main/java/de/juplo/kafka/Greeting.java new file mode 100644 index 0000000..d228a70 --- /dev/null +++ b/src/main/java/de/juplo/kafka/Greeting.java @@ -0,0 +1,13 @@ +package de.juplo.kafka; + +import lombok.Data; + +import java.time.LocalDateTime; + + +@Data +public class Greeting +{ + String name; + LocalDateTime when; +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 4b22bd2..75ec38c 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -31,7 +31,7 @@ spring: value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer properties: spring.deserializer.value.delegate.class: "org.springframework.kafka.support.serializer.JsonDeserializer" - spring.json.type.mapping: "message:de.juplo.kafka.ClientMessage" + spring.json.type.mapping: "message:de.juplo.kafka.ClientMessage,greeting:de.juplo.kafka.Greeting" spring.json.trusted.packages: "de.juplo.kafka" producer: bootstrap-servers: :9092 diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 7ec73f3..8a485b3 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -63,8 +63,6 @@ class ApplicationTests @Autowired KafkaProducer kafkaProducer; @Autowired - org.apache.kafka.clients.consumer.Consumer kafkaConsumer; - @Autowired KafkaConsumer offsetConsumer; @Autowired ApplicationProperties applicationProperties; -- 2.20.1