X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FSimpleConsumer.java;h=e9ce220ef33aeae424b3b6cbacc16a255e6af6ea;hb=refs%2Ftags%2Fspring%2Fspring-consumer--kafkalistener--vorlage---2023-06-signal;hp=0e376864d1958d477117a5b05be13e0515a2de75;hpb=5b9f4cd21a87b03cb1c432e9965fb0082ab05dd3;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/SimpleConsumer.java b/src/main/java/de/juplo/kafka/SimpleConsumer.java index 0e37686..e9ce220 100644 --- a/src/main/java/de/juplo/kafka/SimpleConsumer.java +++ b/src/main/java/de/juplo/kafka/SimpleConsumer.java @@ -1,70 +1,30 @@ package de.juplo.kafka; -import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.common.errors.WakeupException; - -import java.time.Duration; -import java.util.Arrays; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.messaging.handler.annotation.Payload; +import org.springframework.stereotype.Component; @Slf4j -@RequiredArgsConstructor -public class SimpleConsumer implements Runnable +@Component +public class SimpleConsumer { - private final String id; - private final String topic; - private final Consumer consumer; - + @Value("${spring.kafka.client-id}") + private String id; private long consumed = 0; - - @Override - public void run() + private void handleRecord( + String topic, + Integer partition, + Long offset, + String key, + String value) { - try - { - log.info("{} - Subscribing to topic {}", id, topic); - consumer.subscribe(Arrays.asList(topic)); - - while (true) - { - ConsumerRecords records = - consumer.poll(Duration.ofSeconds(1)); - - log.info("{} - Received {} messages", id, records.count()); - for (ConsumerRecord record : records) - { - consumed++; - log.info( - "{} - {}: {}/{} - {}={}", - id, - record.offset(), - record.topic(), - record.partition(), - record.key(), - record.value() - ); - } - } - } - catch(WakeupException e) - { - log.info("{} - Consumer was signaled to finish its work", id); - } - catch(Exception e) - { - log.error("{} - Unexpected error: {}, unsubscribing!", id, e.toString()); - consumer.unsubscribe(); - } - finally - { - log.info("{} - Closing the KafkaConsumer", id); - consumer.close(); - log.info("{}: Consumed {} messages in total, exiting!", id, consumed); - } + consumed++; + log.info("{} - {}: {}/{} - {}={}", id, offset, topic, partition, key, value); } }