Der Value-Typ in dem Topic `state` ist jetzt auch vom Typ `String`
authorKai Moritz <kai@juplo.de>
Mon, 28 Oct 2024 07:00:48 +0000 (08:00 +0100)
committerKai Moritz <kai@juplo.de>
Mon, 28 Oct 2024 13:49:59 +0000 (14:49 +0100)
* Dadurch wird die Kontrolle der Ergebnisse einfacher, da alle Nachrichten
  auch einfach mit `kafkacat` gelesen werden können.

src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/ExampleConsumer.java

index 5e4302c..c3c4e79 100644 (file)
@@ -5,7 +5,6 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.StickyAssignor;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
@@ -22,7 +21,7 @@ public class ApplicationConfiguration
   @Bean
   public ExampleConsumer exampleConsumer(
       Consumer<String, String> kafkaConsumer,
-      Producer<String, Long> kafkaProducer,
+      Producer<String, String> kafkaProducer,
       ApplicationProperties properties)
   {
     return
@@ -58,7 +57,7 @@ public class ApplicationConfiguration
   }
 
   @Bean
-  public KafkaProducer<String, Long> kafkaProducer(ApplicationProperties properties)
+  public KafkaProducer<String, String> kafkaProducer(ApplicationProperties properties)
   {
     Properties props = new Properties();
     props.put("bootstrap.servers", properties.getBootstrapServer());
@@ -71,7 +70,7 @@ public class ApplicationConfiguration
     props.put("linger.ms", properties.getProducerProperties().getLingerMs());
     props.put("compression.type", properties.getProducerProperties().getCompressionType());
     props.put("key.serializer", StringSerializer.class.getName());
-    props.put("value.serializer", LongSerializer.class.getName());
+    props.put("value.serializer", StringSerializer.class.getName());
 
     return new KafkaProducer<>(props);
   }
index 77cef8d..7d8e199 100644 (file)
@@ -28,7 +28,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
 
   private final Map<String, Long> counterState = new HashMap<>();
   private final String stateTopic;
-  Producer<String, Long> producer;
+  Producer<String, String> producer;
 
   private volatile boolean running = false;
   private final Phaser phaser = new Phaser(1);
@@ -43,7 +43,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
     String topic,
     Consumer<String, String> consumer,
     String stateTopic,
-    Producer<String, Long> producer)
+    Producer<String, String> producer)
   {
     this.id = clientId;
     this.topic = topic;
@@ -145,7 +145,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
   void sendCounterState(int partition, String key, Long counter)
   {
     seen[partition]++;
-    ProducerRecord<String, Long> record = new ProducerRecord<>(stateTopic, key, counter);
+    ProducerRecord<String, String> record = new ProducerRecord<>(stateTopic, key, counter.toString());
     producer.send(record, ((metadata, exception) ->
     {
       if (exception == null)