From ca58798792c349951fb5d25db6f9a4efb6bbde71 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 1 Mar 2025 17:42:14 +0100 Subject: [PATCH] =?utf8?q?`ExampleConsumer`=20=C3=BCber=20Generics=20typis?= =?utf8?q?iert?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- .../juplo/kafka/ApplicationConfiguration.java | 16 ++++++++++++++++ .../java/de/juplo/kafka/ExampleProducer.java | 18 ++++++++++++------ src/main/java/de/juplo/kafka/KeyGenerator.java | 6 ++++++ .../java/de/juplo/kafka/ValueGenerator.java | 6 ++++++ 4 files changed, 40 insertions(+), 6 deletions(-) create mode 100644 src/main/java/de/juplo/kafka/KeyGenerator.java create mode 100644 src/main/java/de/juplo/kafka/ValueGenerator.java diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 0090ceea..feb27a40 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -19,6 +19,8 @@ public class ApplicationConfiguration @Bean public ExampleProducer exampleProducer( ApplicationProperties properties, + KeyGenerator keyGenerator, + ValueGenerator valueGenerator, Producer kafkaProducer, ConfigurableApplicationContext applicationContext) { @@ -29,10 +31,24 @@ public class ApplicationConfiguration properties.getProducerProperties().getThrottle() == null ? Duration.ofMillis(500) : properties.getProducerProperties().getThrottle(), + keyGenerator, + valueGenerator, kafkaProducer, () -> applicationContext.close()); } + @Bean + KeyGenerator keyGenerator() + { + return i -> Long.toString(i%10); + } + + @Bean + ValueGenerator messageGenerator() + { + return i -> Long.toString(i); + } + @Bean(destroyMethod = "") public KafkaProducer kafkaProducer(ApplicationProperties properties) { diff --git a/src/main/java/de/juplo/kafka/ExampleProducer.java b/src/main/java/de/juplo/kafka/ExampleProducer.java index 25e885d9..df5bdb6c 100644 --- a/src/main/java/de/juplo/kafka/ExampleProducer.java +++ b/src/main/java/de/juplo/kafka/ExampleProducer.java @@ -8,12 +8,14 @@ import java.time.Duration; @Slf4j -public class ExampleProducer implements Runnable +public class ExampleProducer implements Runnable { private final String id; private final String topic; private final Duration throttle; - private final Producer producer; + private final KeyGenerator keyGenerator; + private final ValueGenerator valueGenerator; + private final Producer producer; private final Thread workerThread; private final Runnable closeCallback; @@ -25,12 +27,16 @@ public class ExampleProducer implements Runnable String id, String topic, Duration throttle, - Producer producer, + KeyGenerator keyGenerator, + ValueGenerator valueGenerator, + Producer producer, Runnable closeCallback) { this.id = id; this.topic = topic; this.throttle = throttle; + this.keyGenerator = keyGenerator; + this.valueGenerator = valueGenerator; this.producer = producer; workerThread = new Thread(this, "ExampleProducer Worker-Thread"); @@ -49,7 +55,7 @@ public class ExampleProducer implements Runnable { for (; running; i++) { - send(Long.toString(i%10), Long.toString(i)); + send(keyGenerator.generateKeyFor(i), valueGenerator.generateValueFor(i)); if (throttle.isPositive()) { @@ -78,11 +84,11 @@ public class ExampleProducer implements Runnable } } - void send(String key, String value) + void send(K key, V value) { final long time = System.currentTimeMillis(); - final ProducerRecord record = new ProducerRecord<>( + final ProducerRecord record = new ProducerRecord<>( topic, // Topic key, // Key value // Value diff --git a/src/main/java/de/juplo/kafka/KeyGenerator.java b/src/main/java/de/juplo/kafka/KeyGenerator.java new file mode 100644 index 00000000..978e007d --- /dev/null +++ b/src/main/java/de/juplo/kafka/KeyGenerator.java @@ -0,0 +1,6 @@ +package de.juplo.kafka; + +public interface KeyGenerator +{ + K generateKeyFor(long i); +} diff --git a/src/main/java/de/juplo/kafka/ValueGenerator.java b/src/main/java/de/juplo/kafka/ValueGenerator.java new file mode 100644 index 00000000..3053072b --- /dev/null +++ b/src/main/java/de/juplo/kafka/ValueGenerator.java @@ -0,0 +1,6 @@ +package de.juplo.kafka; + +public interface ValueGenerator +{ + V generateValueFor(long i); +} -- 2.20.1