929bdbd5ccf659c20cf86005686ed94406d7d7c6
[demos/kafka/training] / src / main / java / de / juplo / kafka / EndlessConsumer.java
1 package de.juplo.kafka;
2
3 import lombok.RequiredArgsConstructor;
4 import lombok.extern.slf4j.Slf4j;
5 import org.apache.kafka.clients.consumer.ConsumerRecord;
6 import org.springframework.beans.factory.annotation.Autowired;
7 import org.springframework.beans.factory.annotation.Value;
8 import org.springframework.kafka.annotation.KafkaListener;
9 import org.springframework.stereotype.Component;
10
11 import java.util.List;
12 import java.util.function.Consumer;
13
14
15 @Component
16 @Slf4j
17 @RequiredArgsConstructor
18 public class EndlessConsumer<K, V>
19 {
20   @Value("${consumer.client-id}")
21   String id;
22   @Autowired
23   Consumer<ConsumerRecord<K, V>> handler;
24
25
26   @KafkaListener(topics = "${consumer.topic}", containerFactory = "batchFactory")
27   public void receive(List<ConsumerRecord<K, V>> records)
28   {
29     // Do something with the data...
30     log.info("{} - Received {} messages", id, records.size());
31     for (ConsumerRecord<K, V> record : records)
32     {
33       log.info(
34           "{} - {}: {}/{} - {}={}",
35           id,
36           record.offset(),
37           record.topic(),
38           record.partition(),
39           record.key(),
40           record.value()
41       );
42
43       handler.accept(record);
44     }
45   }
46 }