From: Kai Moritz Date: Thu, 14 Nov 2024 19:42:25 +0000 (+0100) Subject: `SimpleConsumer` in `ExampleConsumer` umbenannt -- MOVE X-Git-Tag: spring/spring-consumer--kafkalistener--BRANCH-ENDE~7 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=d9104e684117a2c70cd57f9b5644b137cc19f11b;p=demos%2Fkafka%2Ftraining `SimpleConsumer` in `ExampleConsumer` umbenannt -- MOVE --- diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java new file mode 100644 index 0000000..fe0479f --- /dev/null +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -0,0 +1,36 @@ +package de.juplo.kafka; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.messaging.handler.annotation.Payload; +import org.springframework.stereotype.Component; + + +@Slf4j +@Component +public class SimpleConsumer +{ + @Value("${spring.kafka.client-id}") + private String id; + private long consumed = 0; + + @KafkaListener(topics = "${simple.consumer.topic}") + private void handleRecord( + @Header(KafkaHeaders.RECEIVED_TOPIC) + String topic, + @Header(KafkaHeaders.RECEIVED_PARTITION_ID) + Integer partition, + @Header(KafkaHeaders.OFFSET) + Long offset, + @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) + String key, + @Payload + String value) + { + consumed++; + log.info("{} - {}: {}/{} - {}={}", id, offset, topic, partition, key, value); + } +} diff --git a/src/main/java/de/juplo/kafka/SimpleConsumer.java b/src/main/java/de/juplo/kafka/SimpleConsumer.java deleted file mode 100644 index fe0479f..0000000 --- a/src/main/java/de/juplo/kafka/SimpleConsumer.java +++ /dev/null @@ -1,36 +0,0 @@ -package de.juplo.kafka; - -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.kafka.annotation.KafkaListener; -import org.springframework.kafka.support.KafkaHeaders; -import org.springframework.messaging.handler.annotation.Header; -import org.springframework.messaging.handler.annotation.Payload; -import org.springframework.stereotype.Component; - - -@Slf4j -@Component -public class SimpleConsumer -{ - @Value("${spring.kafka.client-id}") - private String id; - private long consumed = 0; - - @KafkaListener(topics = "${simple.consumer.topic}") - private void handleRecord( - @Header(KafkaHeaders.RECEIVED_TOPIC) - String topic, - @Header(KafkaHeaders.RECEIVED_PARTITION_ID) - Integer partition, - @Header(KafkaHeaders.OFFSET) - Long offset, - @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) - String key, - @Payload - String value) - { - consumed++; - log.info("{} - {}: {}/{} - {}={}", id, offset, topic, partition, key, value); - } -}