Vorlage
[demos/kafka/training] / src / main / java / de / juplo / kafka / SimpleConsumer.java
1 package de.juplo.kafka;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.consumer.ConsumerRecord;
5 import org.apache.kafka.clients.consumer.ConsumerRecords;
6 import org.apache.kafka.clients.consumer.Consumer;
7 import org.apache.kafka.clients.consumer.KafkaConsumer;
8 import org.apache.kafka.common.errors.WakeupException;
9 import org.apache.kafka.common.serialization.StringDeserializer;
10
11 import java.time.Duration;
12 import java.util.Arrays;
13 import java.util.Properties;
14
15
16 @Slf4j
17 public class SimpleConsumer
18 {
19   private final String id;
20   private final String topic;
21   private final Consumer<String, String> consumer;
22
23   private volatile boolean running = false;
24   private long consumed = 0;
25
26   public SimpleConsumer(String broker, String topic, String groupId, String clientId)
27   {
28     Properties props = new Properties();
29     // Konfiguration für den Consumer zusammenstellen
30
31     this.id = clientId;
32     this.topic = topic;
33     consumer = new KafkaConsumer<>(props);
34   }
35
36
37   public void run()
38   {
39     try
40     {
41       log.info("{} - Subscribing to topic {}", id, topic);
42       // TODO: subscribe!
43       running = true;
44
45       while (true)
46       {
47         // TODO:
48         // Über consumer.poll() Nachrichten abrufen und diese
49         // über log.info() ausgeben und in dem Attribut consumed
50         // zählen.
51       }
52     }
53     catch(WakeupException e)
54     {
55       log.info("{} - Consumer was signaled to finish its work", id);
56     }
57     catch(Exception e)
58     {
59       log.error("{} - Unexpected error: {}, unsubscribing!", id, e.toString());
60       consumer.unsubscribe();
61     }
62     finally
63     {
64       running = false;
65       log.info("{} - Closing the KafkaConsumer", id);
66       consumer.close();
67       log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
68     }
69   }
70
71
72   public static void main(String[] args) throws Exception
73   {
74     String broker = ":9092";
75     String topic = "test";
76     String groupId = "my-group";
77     String clientId = "DEV";
78
79     switch (args.length)
80     {
81       case 4:
82         clientId = args[3];
83       case 3:
84         groupId = args[2];
85       case 2:
86         topic = args[1];
87       case 1:
88         broker = args[0];
89     }
90
91
92     SimpleConsumer instance = new SimpleConsumer(broker, topic, groupId, clientId);
93
94     Runtime.getRuntime().addShutdownHook(new Thread(() ->
95     {
96       instance.consumer.wakeup();
97
98       while (instance.running)
99       {
100         log.info("Waiting for main-thread...");
101         try
102         {
103           Thread.sleep(1000);
104         }
105         catch (InterruptedException e) {}
106       }
107       log.info("Shutdown completed.");
108     }));
109
110     log.info(
111         "Running SimpleConsumer: broker={}, topic={}, group-id={}, client-id={}",
112         broker,
113         topic,
114         groupId,
115         clientId);
116     instance.run();
117   }
118 }