X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FEndlessConsumer.java;h=27c1e446cab68442bdf41ee6bb44baeb5c72e6c5;hb=66ff7d205e66616de8aaca94503dbbcd7d281f6d;hp=d3d11ae976ae268527503aa2611328814c27d0a4;hpb=f095f71a104fcde025a63f87ba75eb5cb3136656;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index d3d11ae..27c1e44 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -3,8 +3,12 @@ package de.juplo.kafka; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.kafka.annotation.KafkaHandler; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.config.KafkaListenerEndpointRegistry; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.messaging.handler.annotation.Payload; import java.util.List; import java.util.Optional; @@ -12,42 +16,64 @@ import java.util.Optional; @RequiredArgsConstructor @Slf4j -public class EndlessConsumer +@KafkaListener( + id = "${spring.kafka.client-id}", + idIsGroup = false, + topics = "${sumup.adder.topic}", + autoStartup = "false") +public class EndlessConsumer { private final String id; private final KafkaListenerEndpointRegistry registry; - private final ApplicationErrorHandler errorHandler; - private final RecordHandler recordHandler; + private final RecordHandler recordHandler; private long consumed = 0; - @KafkaListener( - id = "${spring.kafka.client-id}", - idIsGroup = false, - topics = "${sumup.adder.topic}", - batch = "true", - autoStartup = "false") - public void accept(List> records) + @KafkaHandler + public void addNumber( + @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, + @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, + @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition, + @Header(KafkaHeaders.OFFSET) Long offset, + @Payload MessageAddNumber message) { - // Do something with the data... - log.info("{} - Received {} messages", id, records.size()); - for (ConsumerRecord record : records) - { log.info( "{} - {}: {}/{} - {}={}", id, - record.offset(), - record.topic(), - record.partition(), - record.key(), - record.value() + offset, + topic, + partition, + key, + message ); - recordHandler.accept(record); + recordHandler.addNumber(topic, partition, offset, key, message); + + consumed++; + } + + @KafkaHandler + public void calculateSum( + @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, + @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, + @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition, + @Header(KafkaHeaders.OFFSET) Long offset, + @Payload MessageCalculateSum message) + { + log.info( + "{} - {}: {}/{} - {}={}", + id, + offset, + topic, + partition, + key, + message + ); + + recordHandler.calculateSum(topic, partition, offset, key, message); consumed++; - } } public void start() @@ -56,7 +82,6 @@ public class EndlessConsumer throw new IllegalStateException("Consumer instance " + id + " is already running!"); log.info("{} - Starting - consumed {} messages before", id, consumed); - errorHandler.clearState(); registry.getListenerContainer(id).start(); } @@ -74,12 +99,4 @@ public class EndlessConsumer { return registry.getListenerContainer(id).isRunning(); } - - public Optional exitStatus() - { - if (running()) - throw new IllegalStateException("No exit-status available: Consumer instance " + id + " is running!"); - - return errorHandler.getException(); - } }