package de.juplo.kafka;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 
 @Configuration
 @EnableConfigurationProperties({ KafkaProperties.class, ApplicationProperties.class })
+@Slf4j
 public class ApplicationConfiguration
 {
   @Bean
-  public Consumer<ConsumerRecord<String, ClientMessage>> consumer()
+  public Consumer<ClientMessage> messageHandler()
   {
-    return (record) ->
+    return (message) ->
     {
-      // Handle record
+      log.info("Received ClientMessage: {}", message);
+    };
+  }
+
+  @Bean
+  public Consumer<Greeting> greetingsHandler()
+  {
+    return (greeting) ->
+    {
+      log.info("Received Greeting: {}", greeting);
     };
   }
 
         new StringSerializer(),
         new DelegatingByTypeSerializer(Map.of(
             byte[].class, new ByteArraySerializer(),
-            ClientMessage.class, new JsonSerializer<>())));
+            Object.class, new JsonSerializer<>())));
   }
 
   @Bean
   }
 
   @Bean(destroyMethod = "close")
-  public org.apache.kafka.clients.consumer.Consumer<String, ClientMessage> kafkaConsumer(ConsumerFactory<String, ClientMessage> factory)
+  public org.apache.kafka.clients.consumer.Consumer<String, Object> kafkaConsumer(ConsumerFactory<String, Object> factory)
   {
     return factory.createConsumer();
   }
 
 @RequiredArgsConstructor
 public class ApplicationHealthIndicator implements HealthIndicator
 {
-  private final EndlessConsumer<String, Long> consumer;
+  private final EndlessConsumer consumer;
 
 
   @Override
 
 import org.apache.kafka.common.TopicPartition;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
+import org.springframework.kafka.annotation.KafkaHandler;
 import org.springframework.kafka.annotation.KafkaListener;
 import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
 import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
 @Component
 @Slf4j
 @RequiredArgsConstructor
-public class EndlessConsumer<K, V> implements ConsumerAwareRebalanceListener
+@KafkaListener(
+    id = "${spring.kafka.consumer.client-id}",
+    idIsGroup = false,
+    topics = "${consumer.topic}",
+    autoStartup = "false")
+public class EndlessConsumer implements ConsumerAwareRebalanceListener
 {
   @Autowired
   private KafkaListenerEndpointRegistry registry;
   @Value("${spring.kafka.consumer.client-id}")
   String id;
   @Autowired
-  Consumer<ConsumerRecord<K, V>> handler;
+  Consumer<ClientMessage> messageHandler;
+  @Autowired
+  Consumer<Greeting> greetingsHandler;
 
   private long consumed = 0;
 
   }
 
 
-  @KafkaListener(
-      id = "${spring.kafka.consumer.client-id}",
-      idIsGroup = false,
-      topics = "${consumer.topic}",
-      autoStartup = "false")
-  public void receive(ConsumerRecord<K, V> record)
+  @KafkaHandler
+  public void receiveGreeting(Greeting greeting)
   {
-    log.info(
-        "{} - {}: {}/{} - {}={}",
-        id,
-        record.offset(),
-        record.topic(),
-        record.partition(),
-        record.key(),
-        record.value()
-    );
-
-    handler.accept(record);
+    greetingsHandler.accept(greeting);
+    consumed++;
+  }
 
+  @KafkaHandler
+  public void receiveMessage(ClientMessage message)
+  {
+    messageHandler.accept(message);
     consumed++;
   }
 
 
--- /dev/null
+package de.juplo.kafka;
+
+import lombok.Data;
+
+import java.time.LocalDateTime;
+
+
+@Data
+public class Greeting
+{
+  String name;
+  LocalDateTime when;
+}
 
       value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
       properties:
         spring.deserializer.value.delegate.class: "org.springframework.kafka.support.serializer.JsonDeserializer"
-        spring.json.type.mapping: "message:de.juplo.kafka.ClientMessage"
+        spring.json.type.mapping: "message:de.juplo.kafka.ClientMessage,greeting:de.juplo.kafka.Greeting"
         spring.json.trusted.packages: "de.juplo.kafka"
     producer:
       bootstrap-servers: :9092
 
        @Autowired
        KafkaProducer<String, Bytes> kafkaProducer;
        @Autowired
-       org.apache.kafka.clients.consumer.Consumer<String, ClientMessage> kafkaConsumer;
-       @Autowired
        KafkaConsumer<Bytes, Bytes> offsetConsumer;
        @Autowired
        ApplicationProperties applicationProperties;