From: Kai Moritz Date: Thu, 20 Feb 2025 17:36:13 +0000 (+0100) Subject: `ExampleConsumer` über Generics typisiert X-Git-Tag: consumer/spring-consumer--COMMITS--2025-02 X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=89b656a0119649a2d01eb89208d5d9c0b6122d98;p=demos%2Fkafka%2Ftraining `ExampleConsumer` über Generics typisiert --- diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index a4856a64..798abacd 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -17,13 +17,13 @@ import java.util.Properties; public class ApplicationConfiguration { @Bean - public ExampleConsumer exampleConsumer( + public ExampleConsumer exampleConsumer( Consumer kafkaConsumer, ApplicationProperties properties, ConfigurableApplicationContext applicationContext) { return - new ExampleConsumer( + new ExampleConsumer<>( properties.getClientId(), properties.getConsumerProperties().getTopic(), kafkaConsumer, diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index 79382ef9..a6691c3b 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -11,11 +11,11 @@ import java.util.Arrays; @Slf4j -public class ExampleConsumer implements Runnable +public class ExampleConsumer implements Runnable { private final String id; private final String topic; - private final Consumer consumer; + private final Consumer consumer; private final Thread workerThread; private final Runnable closeCallback; @@ -26,7 +26,7 @@ public class ExampleConsumer implements Runnable public ExampleConsumer( String clientId, String topic, - Consumer consumer, + Consumer consumer, Runnable closeCallback) { this.id = clientId; @@ -51,11 +51,10 @@ public class ExampleConsumer implements Runnable while (running) { - ConsumerRecords records = - consumer.poll(Duration.ofSeconds(1)); + ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); log.info("{} - Received {} messages", id, records.count()); - for (ConsumerRecord record : records) + for (ConsumerRecord record : records) { handleRecord( record.topic(), @@ -89,8 +88,8 @@ public class ExampleConsumer implements Runnable String topic, Integer partition, Long offset, - String key, - String value) + K key, + V value) { consumed++; log.info("{} - partition={}-{}, offset={}: {}={}", id, topic, partition, offset, key, value);