Springify: Kernfunktion von EndlessConsumer über Spring-Kafka
[demos/kafka/training] / src / main / java / de / juplo / kafka / EndlessConsumer.java
1 package de.juplo.kafka;
2
3 import lombok.RequiredArgsConstructor;
4 import lombok.extern.slf4j.Slf4j;
5 import org.apache.kafka.clients.consumer.ConsumerRecord;
6 import org.springframework.beans.factory.annotation.Autowired;
7 import org.springframework.beans.factory.annotation.Value;
8 import org.springframework.kafka.annotation.KafkaListener;
9 import org.springframework.stereotype.Component;
10
11 import java.util.function.Consumer;
12
13
14 @Component
15 @Slf4j
16 @RequiredArgsConstructor
17 public class EndlessConsumer<K, V>
18 {
19   @Value("${consumer.client-id}")
20   String id;
21   @Autowired
22   Consumer<ConsumerRecord<K, V>> handler;
23
24
25   @KafkaListener(topics = "${consumer.topic}")
26   public void receive(ConsumerRecord<K, V> record)
27   {
28     log.info(
29         "{} - {}: {}/{} - {}={}",
30         id,
31         record.offset(),
32         record.topic(),
33         record.partition(),
34         record.key(),
35         record.value()
36     );
37
38     handler.accept(record);
39   }
40 }