X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FSimpleConsumer.java;h=31c7e6b556df41c14ac9cc3e960ba31653ea01a3;hb=17b25ab1fb29e6445a25dbc604f261322bdda0ec;hp=1cf9b22595100a3a32cbe620437b6cd809a05b32;hpb=8f14291a52872141ded7ecead6b1f7f9fbc969e4;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/SimpleConsumer.java b/src/main/java/de/juplo/kafka/SimpleConsumer.java index 1cf9b22..31c7e6b 100644 --- a/src/main/java/de/juplo/kafka/SimpleConsumer.java +++ b/src/main/java/de/juplo/kafka/SimpleConsumer.java @@ -1,74 +1,36 @@ package de.juplo.kafka; -import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.common.errors.WakeupException; - -import java.time.Duration; -import java.util.Arrays; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.messaging.handler.annotation.Payload; +import org.springframework.stereotype.Component; @Slf4j -@RequiredArgsConstructor -public class SimpleConsumer implements Runnable +@Component +public class SimpleConsumer { - private final String id; - private final String topic; - private final Consumer consumer; - + @Value("${spring.kafka.client-id}") + private String id; private long consumed = 0; - - @Override - public void run() - { - try - { - log.info("{} - Subscribing to topic {}", id, topic); - consumer.subscribe(Arrays.asList(topic)); - - while (true) - { - ConsumerRecords records = - consumer.poll(Duration.ofSeconds(1)); - - log.info("{} - Received {} messages", id, records.count()); - for (ConsumerRecord record : records) - { - handleRecord( - record.topic(), - record.partition(), - record.offset(), - record.key(), - record.value()); - } - } - } - catch(WakeupException e) - { - log.info("{} - Consumer was signaled to finish its work", id); - } - catch(Exception e) - { - log.error("{} - Unexpected error: {}, unsubscribing!", id, e.toString()); - consumer.unsubscribe(); - } - finally - { - log.info("{} - Closing the KafkaConsumer", id); - consumer.close(); - log.info("{}: Consumed {} messages in total, exiting!", id, consumed); - } - } - + @KafkaListener( + id = "${spring.kafka.client-id}", + groupId = "${spring.kafka.consumer.group-id}", + topics = "${simple.consumer.topic}") private void handleRecord( + @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, + @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition, + @Header(KafkaHeaders.OFFSET) Long offset, + @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, + @Payload String value) { consumed++;