`@KafkaListener`-Version des `spring-consumer` mit Business-Logik
[demos/kafka/training] / src / main / java / de / juplo / kafka / SimpleConsumer.java
1 package de.juplo.kafka;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.springframework.beans.factory.annotation.Autowired;
5 import org.springframework.beans.factory.annotation.Value;
6 import org.springframework.kafka.annotation.KafkaListener;
7 import org.springframework.kafka.support.KafkaHeaders;
8 import org.springframework.messaging.handler.annotation.Header;
9 import org.springframework.messaging.handler.annotation.Payload;
10 import org.springframework.stereotype.Component;
11
12
13 @Slf4j
14 @Component
15 public class SimpleConsumer
16 {
17   @Value("${spring.kafka.client-id}")
18   private String id;
19   @Autowired
20   private MessageHandler messageHandler;
21
22   private long consumed = 0;
23
24
25   @KafkaListener(topics = "${simple.consumer.topic}")
26   private void handleRecord(
27     @Header(KafkaHeaders.RECEIVED_TOPIC)
28     String topic,
29     @Header(KafkaHeaders.RECEIVED_PARTITION_ID)
30     Integer partition,
31     @Header(KafkaHeaders.OFFSET)
32     Long offset,
33     @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY)
34     String key,
35     @Payload
36     Message value)
37   {
38     consumed++;
39     log.info("{} - {}: {}/{} - {}={}", id, offset, topic, partition, key, value);
40     messageHandler.handle(key, value);
41   }
42 }