`ExampleConsumer` über Generics typisiert producer/spring-producer--generics producer/spring-producer--generics--2025-03-signal producer/spring-producer--generics--2025-04-signal
authorKai Moritz <kai@juplo.de>
Sat, 1 Mar 2025 16:42:14 +0000 (17:42 +0100)
committerKai Moritz <kai@juplo.de>
Mon, 17 Mar 2025 15:58:03 +0000 (16:58 +0100)
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/ExampleProducer.java
src/main/java/de/juplo/kafka/KeyGenerator.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/ValueGenerator.java [new file with mode: 0644]

index 0090cee..feb27a4 100644 (file)
@@ -19,6 +19,8 @@ public class ApplicationConfiguration
   @Bean
   public ExampleProducer exampleProducer(
     ApplicationProperties properties,
+    KeyGenerator<String> keyGenerator,
+    ValueGenerator<String> valueGenerator,
     Producer<String, String> kafkaProducer,
     ConfigurableApplicationContext applicationContext)
   {
@@ -29,10 +31,24 @@ public class ApplicationConfiguration
         properties.getProducerProperties().getThrottle() == null
           ? Duration.ofMillis(500)
           : properties.getProducerProperties().getThrottle(),
+        keyGenerator,
+        valueGenerator,
         kafkaProducer,
         () -> applicationContext.close());
   }
 
+  @Bean
+  KeyGenerator<String> keyGenerator()
+  {
+    return i -> Long.toString(i%10);
+  }
+
+  @Bean
+  ValueGenerator<String> messageGenerator()
+  {
+    return i -> Long.toString(i);
+  }
+
   @Bean(destroyMethod = "")
   public KafkaProducer<String, String> kafkaProducer(ApplicationProperties properties)
   {
index 25e885d..df5bdb6 100644 (file)
@@ -8,12 +8,14 @@ import java.time.Duration;
 
 
 @Slf4j
-public class ExampleProducer implements Runnable
+public class ExampleProducer<K, V> implements Runnable
 {
   private final String id;
   private final String topic;
   private final Duration throttle;
-  private final Producer<String, String> producer;
+  private final KeyGenerator<K> keyGenerator;
+  private final ValueGenerator<V> valueGenerator;
+  private final Producer<K, V> producer;
   private final Thread workerThread;
   private final Runnable closeCallback;
 
@@ -25,12 +27,16 @@ public class ExampleProducer implements Runnable
     String id,
     String topic,
     Duration throttle,
-    Producer<String, String> producer,
+    KeyGenerator<K> keyGenerator,
+    ValueGenerator<V> valueGenerator,
+    Producer<K, V> producer,
     Runnable closeCallback)
   {
     this.id = id;
     this.topic = topic;
     this.throttle = throttle;
+    this.keyGenerator = keyGenerator;
+    this.valueGenerator = valueGenerator;
     this.producer = producer;
 
     workerThread = new Thread(this, "ExampleProducer Worker-Thread");
@@ -49,7 +55,7 @@ public class ExampleProducer implements Runnable
     {
       for (; running; i++)
       {
-        send(Long.toString(i%10), Long.toString(i));
+        send(keyGenerator.generateKeyFor(i), valueGenerator.generateValueFor(i));
 
         if (throttle.isPositive())
         {
@@ -78,11 +84,11 @@ public class ExampleProducer implements Runnable
     }
   }
 
-  void send(String key, String value)
+  void send(K key, V value)
   {
     final long time = System.currentTimeMillis();
 
-    final ProducerRecord<String, String> record = new ProducerRecord<>(
+    final ProducerRecord<K, V> record = new ProducerRecord<>(
       topic,  // Topic
       key,    // Key
       value   // Value
diff --git a/src/main/java/de/juplo/kafka/KeyGenerator.java b/src/main/java/de/juplo/kafka/KeyGenerator.java
new file mode 100644 (file)
index 0000000..978e007
--- /dev/null
@@ -0,0 +1,6 @@
+package de.juplo.kafka;
+
+public interface KeyGenerator<K>
+{
+  K generateKeyFor(long i);
+}
diff --git a/src/main/java/de/juplo/kafka/ValueGenerator.java b/src/main/java/de/juplo/kafka/ValueGenerator.java
new file mode 100644 (file)
index 0000000..3053072
--- /dev/null
@@ -0,0 +1,6 @@
+package de.juplo.kafka;
+
+public interface ValueGenerator<V>
+{
+  V generateValueFor(long i);
+}