From: Kai Moritz Date: Wed, 2 Nov 2022 17:36:14 +0000 (+0100) Subject: WIP X-Git-Tag: simple-consumer--json-DEPRECATED~27 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=ffd5ad8116f8269ae828a7732cf2bd862f7ba095;p=demos%2Fkafka%2Ftraining WIP --- diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java index b4a960d..94224e1 100644 --- a/src/main/java/de/juplo/kafka/Application.java +++ b/src/main/java/de/juplo/kafka/Application.java @@ -11,8 +11,6 @@ import org.springframework.context.annotation.Bean; import org.springframework.kafka.core.ConsumerFactory; import javax.annotation.PreDestroy; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; @SpringBootApplication diff --git a/src/main/java/de/juplo/kafka/SimpleConsumer.java b/src/main/java/de/juplo/kafka/SimpleConsumer.java new file mode 100644 index 0000000..0d371f4 --- /dev/null +++ b/src/main/java/de/juplo/kafka/SimpleConsumer.java @@ -0,0 +1,80 @@ +package de.juplo.kafka; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.errors.WakeupException; + +import java.time.Duration; +import java.util.Arrays; +import java.util.concurrent.ExecutorService; + + +@Slf4j +@RequiredArgsConstructor +public class SimpleConsumer implements Runnable +{ + private final ExecutorService executor; + private final String id; + private final String topic; + private final Consumer consumer; + + private volatile boolean running = false; + private long consumed = 0; + + + @Override + public void run() + { + try + { + log.info("{} - Subscribing to topic test", id); + consumer.subscribe(Arrays.asList("test")); + running = true; + + while (true) + { + ConsumerRecords records = + consumer.poll(Duration.ofSeconds(1)); + + log.info("{} - Received {} messages", id, records.count()); + for (ConsumerRecord record : records) + { + consumed++; + log.info( + "{} - {}: {}/{} - {}={}", + id, + record.offset(), + record.topic(), + record.partition(), + record.key(), + record.value() + ); + } + } + } + catch(WakeupException e) + { + log.info("{} - Consumer was signaled to finish its work", id); + } + catch(Exception e) + { + log.error("{} - Unexpected error: {}, unsubscribing!", id, e.toString()); + consumer.unsubscribe(); + } + finally + { + running = false; + log.info("{} - Closing the KafkaConsumer", id); + consumer.close(); + log.info("{}: Consumed {} messages in total, exiting!", id, consumed); + } + } + + public void start() + { + executor.submit(this); + } +}