Springify: Der Consumer kann unterschiedliche Nachrichten-Typen empfangen
authorKai Moritz <kai@juplo.de>
Sat, 28 May 2022 16:27:45 +0000 (18:27 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 5 Jun 2022 12:14:46 +0000 (14:14 +0200)
* Neben `ClientMessage` gibt es jetzt auch noch den Nachrichten-Typ
  `Greeting`.
* `ClientMessage` ist auf den Bezeichner `message` gemapped.
* `Greeting` ist auf den Bezeichner `greeting` gemapped.
* Für die beiden bekannten Nachrichten-Typen sind via `@KafkaHandler`
  separate Handler konfiguriert.
* Unbekannte Nachrichten (z.B. `FooMessage`) landen zusammen mit anderen
  Fehlern in dem DLQ-Topic.

src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java
src/main/java/de/juplo/kafka/EndlessConsumer.java
src/main/java/de/juplo/kafka/Greeting.java [new file with mode: 0644]
src/main/resources/application.yml
src/test/java/de/juplo/kafka/ApplicationTests.java

index 4923b09..51e7979 100644 (file)
@@ -1,5 +1,6 @@
 package de.juplo.kafka;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
@@ -21,14 +22,24 @@ import java.util.function.Consumer;
 
 @Configuration
 @EnableConfigurationProperties({ KafkaProperties.class, ApplicationProperties.class })
+@Slf4j
 public class ApplicationConfiguration
 {
   @Bean
-  public Consumer<ConsumerRecord<String, ClientMessage>> consumer()
+  public Consumer<ClientMessage> messageHandler()
   {
-    return (record) ->
+    return (message) ->
     {
-      // Handle record
+      log.info("Received ClientMessage: {}", message);
+    };
+  }
+
+  @Bean
+  public Consumer<Greeting> greetingsHandler()
+  {
+    return (greeting) ->
+    {
+      log.info("Received Greeting: {}", greeting);
     };
   }
 
@@ -39,7 +50,7 @@ public class ApplicationConfiguration
         new StringSerializer(),
         new DelegatingByTypeSerializer(Map.of(
             byte[].class, new ByteArraySerializer(),
-            ClientMessage.class, new JsonSerializer<>())));
+            Object.class, new JsonSerializer<>())));
   }
 
   @Bean
@@ -66,7 +77,7 @@ public class ApplicationConfiguration
   }
 
   @Bean(destroyMethod = "close")
-  public org.apache.kafka.clients.consumer.Consumer<String, ClientMessage> kafkaConsumer(ConsumerFactory<String, ClientMessage> factory)
+  public org.apache.kafka.clients.consumer.Consumer<String, Object> kafkaConsumer(ConsumerFactory<String, Object> factory)
   {
     return factory.createConsumer();
   }
index 742550e..bb18a19 100644 (file)
@@ -10,7 +10,7 @@ import org.springframework.stereotype.Component;
 @RequiredArgsConstructor
 public class ApplicationHealthIndicator implements HealthIndicator
 {
-  private final EndlessConsumer<String, Long> consumer;
+  private final EndlessConsumer consumer;
 
 
   @Override
index 04a0a3a..0a864da 100644 (file)
@@ -6,6 +6,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
+import org.springframework.kafka.annotation.KafkaHandler;
 import org.springframework.kafka.annotation.KafkaListener;
 import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
 import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
@@ -20,14 +21,21 @@ import java.util.function.Consumer;
 @Component
 @Slf4j
 @RequiredArgsConstructor
-public class EndlessConsumer<K, V> implements ConsumerAwareRebalanceListener
+@KafkaListener(
+    id = "${spring.kafka.consumer.client-id}",
+    idIsGroup = false,
+    topics = "${consumer.topic}",
+    autoStartup = "false")
+public class EndlessConsumer implements ConsumerAwareRebalanceListener
 {
   @Autowired
   private KafkaListenerEndpointRegistry registry;
   @Value("${spring.kafka.consumer.client-id}")
   String id;
   @Autowired
-  Consumer<ConsumerRecord<K, V>> handler;
+  Consumer<ClientMessage> messageHandler;
+  @Autowired
+  Consumer<Greeting> greetingsHandler;
 
   private long consumed = 0;
 
@@ -81,25 +89,17 @@ public class EndlessConsumer<K, V> implements ConsumerAwareRebalanceListener
   }
 
 
-  @KafkaListener(
-      id = "${spring.kafka.consumer.client-id}",
-      idIsGroup = false,
-      topics = "${consumer.topic}",
-      autoStartup = "false")
-  public void receive(ConsumerRecord<K, V> record)
+  @KafkaHandler
+  public void receiveGreeting(Greeting greeting)
   {
-    log.info(
-        "{} - {}: {}/{} - {}={}",
-        id,
-        record.offset(),
-        record.topic(),
-        record.partition(),
-        record.key(),
-        record.value()
-    );
-
-    handler.accept(record);
+    greetingsHandler.accept(greeting);
+    consumed++;
+  }
 
+  @KafkaHandler
+  public void receiveMessage(ClientMessage message)
+  {
+    messageHandler.accept(message);
     consumed++;
   }
 
diff --git a/src/main/java/de/juplo/kafka/Greeting.java b/src/main/java/de/juplo/kafka/Greeting.java
new file mode 100644 (file)
index 0000000..d228a70
--- /dev/null
@@ -0,0 +1,13 @@
+package de.juplo.kafka;
+
+import lombok.Data;
+
+import java.time.LocalDateTime;
+
+
+@Data
+public class Greeting
+{
+  String name;
+  LocalDateTime when;
+}
index 4b22bd2..75ec38c 100644 (file)
@@ -31,7 +31,7 @@ spring:
       value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
       properties:
         spring.deserializer.value.delegate.class: "org.springframework.kafka.support.serializer.JsonDeserializer"
-        spring.json.type.mapping: "message:de.juplo.kafka.ClientMessage"
+        spring.json.type.mapping: "message:de.juplo.kafka.ClientMessage,greeting:de.juplo.kafka.Greeting"
         spring.json.trusted.packages: "de.juplo.kafka"
     producer:
       bootstrap-servers: :9092
index 7ec73f3..8a485b3 100644 (file)
@@ -63,8 +63,6 @@ class ApplicationTests
        @Autowired
        KafkaProducer<String, Bytes> kafkaProducer;
        @Autowired
-       org.apache.kafka.clients.consumer.Consumer<String, ClientMessage> kafkaConsumer;
-       @Autowired
        KafkaConsumer<Bytes, Bytes> offsetConsumer;
        @Autowired
        ApplicationProperties applicationProperties;