"Fixed" the bug by adding a default-handler
[demos/kafka/kafkahandler] / src / main / java / de / juplo / kafka / MultiMessageConsumer.java
1 package de.juplo.kafka;
2
3 import lombok.Getter;
4 import lombok.extern.slf4j.Slf4j;
5 import org.springframework.kafka.annotation.KafkaHandler;
6 import org.springframework.kafka.annotation.KafkaListener;
7 import org.springframework.kafka.listener.adapter.ConsumerRecordMetadata;
8 import org.springframework.kafka.support.KafkaHeaders;
9 import org.springframework.messaging.handler.annotation.Header;
10 import org.springframework.messaging.handler.annotation.Payload;
11 import org.springframework.stereotype.Component;
12
13 import java.util.LinkedList;
14 import java.util.List;
15
16
17 @Component
18 @KafkaListener(topics = "test")
19 @Slf4j
20 @Getter
21 public class MultiMessageConsumer
22 {
23   private final List<Foo> foos = new LinkedList<>();
24   private final List<Bar> bars = new LinkedList<>();
25
26
27   @KafkaHandler
28   public void handleFoo(
29       @Payload Foo foo,
30       @Header(KafkaHeaders.RECORD_METADATA) ConsumerRecordMetadata metadata)
31   {
32     log.info(
33         "Received a Foo: {}, topic={}, partition={}, offset={}",
34         foo,
35         metadata.topic(),
36         metadata.partition(),
37         metadata.offset());
38     foos.add(foo);
39   }
40
41   @KafkaHandler
42   public void handleBar(
43       @Payload Bar bar,
44       @Header(KafkaHeaders.RECORD_METADATA) ConsumerRecordMetadata metadata)
45   {
46     log.info(
47         "Received a Bar: {}",
48         bar,
49         metadata.topic(),
50         metadata.partition(),
51         metadata.offset());
52     bars.add(bar);
53   }
54
55   @KafkaHandler(isDefault = true)
56   void handleUnknown(
57       @Payload Object unknown,
58       @Header(KafkaHeaders.RECORD_METADATA) ConsumerRecordMetadata metadata)
59   {
60     log.info(
61         "Received an unknown message: {}",
62         unknown,
63         metadata.topic(),
64         metadata.partition(),
65         metadata.offset());
66   }
67 }