X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FSimpleConsumer.java;h=0e376864d1958d477117a5b05be13e0515a2de75;hb=5b9f4cd21a87b03cb1c432e9965fb0082ab05dd3;hp=9741e672b0e57636cc9b642e94f0aa8366cd5e8a;hpb=bc8304e07ecbb240fcd02e26a7387006e58c3c42;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/SimpleConsumer.java b/src/main/java/de/juplo/kafka/SimpleConsumer.java index 9741e67..0e37686 100644 --- a/src/main/java/de/juplo/kafka/SimpleConsumer.java +++ b/src/main/java/de/juplo/kafka/SimpleConsumer.java @@ -1,52 +1,34 @@ package de.juplo.kafka; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.errors.WakeupException; -import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.Arrays; -import java.util.Properties; @Slf4j -public class SimpleConsumer +@RequiredArgsConstructor +public class SimpleConsumer implements Runnable { private final String id; private final String topic; - private final KafkaConsumer consumer; + private final Consumer consumer; - private volatile boolean running = false; private long consumed = 0; - public SimpleConsumer(String broker, String topic, String groupId, String clientId) - { - Properties props = new Properties(); - props.put("bootstrap.servers", broker); - props.put("group.id", groupId); // ID für die Offset-Commits - props.put("client.id", clientId); // Nur zur Wiedererkennung - props.put("auto.offset.reset", "earliest"); // Von Beginn an lesen - props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.CooperativeStickyAssignor"); - props.put("key.deserializer", StringDeserializer.class.getName()); - props.put("value.deserializer", StringDeserializer.class.getName()); - - consumer = new KafkaConsumer<>(props); - - this.topic = topic; - this.id = clientId; - } - + @Override public void run() { try { log.info("{} - Subscribing to topic {}", id, topic); consumer.subscribe(Arrays.asList(topic)); - running = true; while (true) { @@ -80,58 +62,9 @@ public class SimpleConsumer } finally { - running = false; log.info("{} - Closing the KafkaConsumer", id); consumer.close(); log.info("{}: Consumed {} messages in total, exiting!", id, consumed); } } - - - public static void main(String[] args) throws Exception - { - String broker = ":9092"; - String topic = "test"; - String groupId = "my-group"; - String clientId = "DEV"; - - switch (args.length) - { - case 4: - clientId = args[3]; - case 3: - groupId = args[2]; - case 2: - topic = args[1]; - case 1: - broker = args[0]; - } - - - SimpleConsumer instance = new SimpleConsumer(broker, topic, groupId, clientId); - - Runtime.getRuntime().addShutdownHook(new Thread(() -> - { - instance.consumer.wakeup(); - - while (instance.running) - { - log.info("Waiting for main-thread..."); - try - { - Thread.sleep(1000); - } - catch (InterruptedException e) {} - } - log.info("Shutdown completed."); - })); - - log.info( - "Running SimpleConsumer: broker={}, topic={}, group-id={}, client-id={}", - broker, - topic, - groupId, - clientId); - instance.run(); - } }