X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FSimpleConsumer.java;h=cc7effb66f60675489e30dc6765f33173a6f3469;hb=refs%2Ftags%2Fspring-consumer--json--adder--kafkalistener--dlt---lvm-2-tage--easy-path;hp=cea956830395cb18a3f0f7bf8f475752ae4208cc;hpb=059583e058173c3c7bd32ff1d16ba428d407931c;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/SimpleConsumer.java b/src/main/java/de/juplo/kafka/SimpleConsumer.java index cea9568..cc7effb 100644 --- a/src/main/java/de/juplo/kafka/SimpleConsumer.java +++ b/src/main/java/de/juplo/kafka/SimpleConsumer.java @@ -1,78 +1,38 @@ package de.juplo.kafka; -import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.common.errors.WakeupException; - -import java.time.Duration; -import java.util.Arrays; -import java.util.concurrent.Callable; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.messaging.handler.annotation.Payload; +import org.springframework.stereotype.Component; @Slf4j -@RequiredArgsConstructor -public class SimpleConsumer implements Callable +@Component +public class SimpleConsumer { - private final String id; - private final String topic; - private final Consumer consumer; - private final MessageHandler messageHandler; + @Value("${spring.kafka.client-id}") + private String id; + @Autowired + private MessageHandler messageHandler; private long consumed = 0; - @Override - public Integer call() - { - try - { - log.info("{} - Subscribing to topic {}", id, topic); - consumer.subscribe(Arrays.asList(topic)); - - while (true) - { - ConsumerRecords records = - consumer.poll(Duration.ofSeconds(1)); - - log.info("{} - Received {} messages", id, records.count()); - for (ConsumerRecord record : records) - { - handleRecord( - record.topic(), - record.partition(), - record.offset(), - record.key(), - record.value()); - } - } - } - catch(WakeupException e) - { - log.info("{} - Consumer was signaled to finish its work", id); - return 0; - } - catch(Exception e) - { - log.error("{} - Unexpected error: {}, unsubscribing!", id, e.toString()); - consumer.unsubscribe(); - return 1; - } - finally - { - log.info("{} - Closing the KafkaConsumer", id); - consumer.close(); - log.info("{}: Consumed {} messages in total, exiting!", id, consumed); - } - } - + @KafkaListener(topics = "${simple.consumer.topic}") private void handleRecord( + @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, + @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition, + @Header(KafkaHeaders.OFFSET) Long offset, + @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, + @Payload Message value) { consumed++;