--- /dev/null
+package de.juplo.kafka;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.support.KafkaHeaders;
+import org.springframework.messaging.handler.annotation.Header;
+import org.springframework.messaging.handler.annotation.Payload;
+import org.springframework.stereotype.Component;
+
+
+@Slf4j
+@Component
+public class SimpleConsumer
+{
+ @Value("${spring.kafka.client-id}")
+ private String id;
+ private long consumed = 0;
+
+ @KafkaListener(topics = "${simple.consumer.topic}")
+ private void handleRecord(
+ @Header(KafkaHeaders.RECEIVED_TOPIC)
+ String topic,
+ @Header(KafkaHeaders.RECEIVED_PARTITION_ID)
+ Integer partition,
+ @Header(KafkaHeaders.OFFSET)
+ Long offset,
+ @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY)
+ String key,
+ @Payload
+ String value)
+ {
+ consumed++;
+ log.info("{} - {}: {}/{} - {}={}", id, offset, topic, partition, key, value);
+ }
+}
+++ /dev/null
-package de.juplo.kafka;
-
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.kafka.annotation.KafkaListener;
-import org.springframework.kafka.support.KafkaHeaders;
-import org.springframework.messaging.handler.annotation.Header;
-import org.springframework.messaging.handler.annotation.Payload;
-import org.springframework.stereotype.Component;
-
-
-@Slf4j
-@Component
-public class SimpleConsumer
-{
- @Value("${spring.kafka.client-id}")
- private String id;
- private long consumed = 0;
-
- @KafkaListener(topics = "${simple.consumer.topic}")
- private void handleRecord(
- @Header(KafkaHeaders.RECEIVED_TOPIC)
- String topic,
- @Header(KafkaHeaders.RECEIVED_PARTITION_ID)
- Integer partition,
- @Header(KafkaHeaders.OFFSET)
- Long offset,
- @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY)
- String key,
- @Payload
- String value)
- {
- consumed++;
- log.info("{} - {}: {}/{} - {}={}", id, offset, topic, partition, key, value);
- }
-}