1 package de.juplo.kafka;
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.consumer.ConsumerRecord;
5 import org.apache.kafka.clients.consumer.ConsumerRecords;
6 import org.apache.kafka.clients.consumer.Consumer;
7 import org.apache.kafka.clients.consumer.KafkaConsumer;
8 import org.apache.kafka.common.errors.WakeupException;
9 import org.apache.kafka.common.serialization.StringDeserializer;
11 import java.time.Duration;
12 import java.util.Arrays;
13 import java.util.Properties;
17 public class SimpleConsumer
19 private final String id;
20 private final String topic;
21 private final Consumer<String, String> consumer;
23 private volatile boolean running = false;
24 private long consumed = 0;
26 public SimpleConsumer(String broker, String topic, String groupId, String clientId)
28 Properties props = new Properties();
29 // Konfiguration für den Consumer zusammenstellen
33 consumer = new KafkaConsumer<>(props);
41 log.info("{} - Subscribing to topic {}", id, topic);
48 // Über consumer.poll() Nachrichten abrufen und diese
49 // über log.info() ausgeben und in dem Attribut consumed
53 catch(WakeupException e)
55 log.info("{} - Consumer was signaled to finish its work", id);
59 log.error("{} - Unexpected error: {}, unsubscribing!", id, e.toString());
60 consumer.unsubscribe();
65 log.info("{} - Closing the KafkaConsumer", id);
67 log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
72 public static void main(String[] args) throws Exception
74 String broker = ":9092";
75 String topic = "test";
76 String groupId = "my-group";
77 String clientId = "DEV";
92 SimpleConsumer instance = new SimpleConsumer(broker, topic, groupId, clientId);
94 Runtime.getRuntime().addShutdownHook(new Thread(() ->
96 instance.consumer.wakeup();
98 while (instance.running)
100 log.info("Waiting for main-thread...");
105 catch (InterruptedException e) {}
107 log.info("Shutdown completed.");
111 "Running SimpleConsumer: broker={}, topic={}, group-id={}, client-id={}",