X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FEndlessConsumer.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FEndlessConsumer.java;h=0000000000000000000000000000000000000000;hb=25c2044064722af20f64651a32e94fb392710bbc;hp=27c1e446cab68442bdf41ee6bb44baeb5c72e6c5;hpb=66ff7d205e66616de8aaca94503dbbcd7d281f6d;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java deleted file mode 100644 index 27c1e44..0000000 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ /dev/null @@ -1,102 +0,0 @@ -package de.juplo.kafka; - -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.springframework.kafka.annotation.KafkaHandler; -import org.springframework.kafka.annotation.KafkaListener; -import org.springframework.kafka.config.KafkaListenerEndpointRegistry; -import org.springframework.kafka.support.KafkaHeaders; -import org.springframework.messaging.handler.annotation.Header; -import org.springframework.messaging.handler.annotation.Payload; - -import java.util.List; -import java.util.Optional; - - -@RequiredArgsConstructor -@Slf4j -@KafkaListener( - id = "${spring.kafka.client-id}", - idIsGroup = false, - topics = "${sumup.adder.topic}", - autoStartup = "false") -public class EndlessConsumer -{ - private final String id; - private final KafkaListenerEndpointRegistry registry; - private final RecordHandler recordHandler; - - private long consumed = 0; - - - @KafkaHandler - public void addNumber( - @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, - @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, - @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition, - @Header(KafkaHeaders.OFFSET) Long offset, - @Payload MessageAddNumber message) - { - log.info( - "{} - {}: {}/{} - {}={}", - id, - offset, - topic, - partition, - key, - message - ); - - recordHandler.addNumber(topic, partition, offset, key, message); - - consumed++; - } - - @KafkaHandler - public void calculateSum( - @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, - @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, - @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition, - @Header(KafkaHeaders.OFFSET) Long offset, - @Payload MessageCalculateSum message) - { - log.info( - "{} - {}: {}/{} - {}={}", - id, - offset, - topic, - partition, - key, - message - ); - - recordHandler.calculateSum(topic, partition, offset, key, message); - - consumed++; - } - - public void start() - { - if (running()) - throw new IllegalStateException("Consumer instance " + id + " is already running!"); - - log.info("{} - Starting - consumed {} messages before", id, consumed); - registry.getListenerContainer(id).start(); - } - - public void stop() - { - if (!running()) - throw new IllegalStateException("Consumer instance " + id + " is not running!"); - - log.info("{} - Stopping", id); - registry.getListenerContainer(id).stop(); - log.info("{} - Stopped - consumed {} messages so far", id, consumed); - } - - public boolean running() - { - return registry.getListenerContainer(id).isRunning(); - } -}