a5554c1c89371c3cf646bb3fc652a900608bffda
[demos/kafka/training] / src / main / java / de / juplo / kafka / SimpleConsumer.java
1 package de.juplo.kafka;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.consumer.ConsumerRecord;
5 import org.apache.kafka.clients.consumer.ConsumerRecords;
6 import org.apache.kafka.clients.consumer.Consumer;
7 import org.apache.kafka.clients.consumer.KafkaConsumer;
8 import org.apache.kafka.common.errors.WakeupException;
9 import org.apache.kafka.common.serialization.StringDeserializer;
10
11 import java.time.Duration;
12 import java.util.Arrays;
13 import java.util.Properties;
14 import java.util.concurrent.locks.Condition;
15 import java.util.concurrent.locks.Lock;
16 import java.util.concurrent.locks.ReentrantLock;
17
18
19 @Slf4j
20 public class SimpleConsumer
21 {
22   private long consumed = 0;
23   private Consumer<String, String> consumer;
24   private Lock lock = new ReentrantLock();
25   private Condition stopped = lock.newCondition();
26
27
28   public SimpleConsumer()
29   {
30     this.consumer = null; // TODO: Eine Instanz von KafkaConsumer erzeugen
31   }
32
33
34   public void run()
35   {
36     String id = "C";
37
38     try
39     {
40       log.info("{} - Subscribing to topic test", id);
41       consumer.subscribe(Arrays.asList("test"));
42
43       while (true)
44       {
45         // TODO:
46         // Über consumer.poll() Nachrichten abrufen und diese
47         // über log.info() ausgeben.
48       }
49     }
50     catch(WakeupException e)
51     {
52       log.info("{} - Closing the KafkaConsumer", id);
53       consumer.close();
54     }
55     catch(Exception e)
56     {
57       log.error("{} - Unexpected error: {}", id, e.toString());
58     }
59     finally
60     {
61       log.info("{} - Shutting down...");
62       this.lock.lock();
63       try
64       {
65         log.info("{} - DONE!");
66         stopped.signal();
67       }
68       finally
69       {
70         this.lock.unlock();
71         log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
72       }
73     }
74   }
75
76
77   public static void main(String[] args) throws Exception
78   {
79     SimpleConsumer instance = new SimpleConsumer();
80
81     Runtime.getRuntime().addShutdownHook(new Thread(() ->
82     {
83       instance.lock.lock();
84       try
85       {
86         instance.consumer.wakeup();
87         instance.stopped.await();
88       }
89       catch (InterruptedException e)
90       {
91         log.warn("Interrrupted while waiting for the consumer to stop!", e);
92       }
93       finally
94       {
95         instance.lock.unlock();
96       }
97     }));
98
99     instance.run();
100   }
101 }