From: Kai Moritz Date: Sun, 29 Sep 2024 12:30:42 +0000 (+0200) Subject: Handling der Records wie in abgeleiteten Versionen in eigener Methode X-Git-Tag: consumer/spring-consumer--REBASE-ANFANG~1 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=15766cf2de84842a08b85c3c9ce3342ce66ef761;p=demos%2Fkafka%2Ftraining Handling der Records wie in abgeleiteten Versionen in eigener Methode --- diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index 98830ba..f722b63 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -61,16 +61,12 @@ public class ExampleConsumer log.info("{} - Received {} messages", id, records.count()); for (ConsumerRecord record : records) { - consumed++; - log.info( - "{} - {}: {}/{} - {}={}", - id, - record.offset(), - record.topic(), - record.partition(), - record.key(), - record.value() - ); + handleRecord( + record.topic(), + record.partition(), + record.offset(), + record.key(), + record.value()); } } } @@ -92,6 +88,17 @@ public class ExampleConsumer } } + private void handleRecord( + String topic, + Integer partition, + Long offset, + String key, + String value) + { + consumed++; + log.info("{} - {}: {}/{} - {}={}", id, offset, topic, partition, key, value); + } + public static void main(String[] args) throws Exception {