From ab648ead57c6400302ef0f9ef94632cb956007f3 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Mon, 28 Oct 2024 08:00:48 +0100 Subject: [PATCH] Der Value-Typ in dem Topic `state` ist jetzt auch vom Typ `String` MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Dadurch wird die Kontrolle der Ergebnisse einfacher, da alle Nachrichten auch einfach mit `kafkacat` gelesen werden können. --- src/main/java/de/juplo/kafka/ApplicationConfiguration.java | 7 +++---- src/main/java/de/juplo/kafka/ExampleConsumer.java | 6 +++--- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 5e4302c..c3c4e79 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -5,7 +5,6 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.StickyAssignor; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.boot.context.properties.EnableConfigurationProperties; @@ -22,7 +21,7 @@ public class ApplicationConfiguration @Bean public ExampleConsumer exampleConsumer( Consumer kafkaConsumer, - Producer kafkaProducer, + Producer kafkaProducer, ApplicationProperties properties) { return @@ -58,7 +57,7 @@ public class ApplicationConfiguration } @Bean - public KafkaProducer kafkaProducer(ApplicationProperties properties) + public KafkaProducer kafkaProducer(ApplicationProperties properties) { Properties props = new Properties(); props.put("bootstrap.servers", properties.getBootstrapServer()); @@ -71,7 +70,7 @@ public class ApplicationConfiguration props.put("linger.ms", properties.getProducerProperties().getLingerMs()); props.put("compression.type", properties.getProducerProperties().getCompressionType()); props.put("key.serializer", StringSerializer.class.getName()); - props.put("value.serializer", LongSerializer.class.getName()); + props.put("value.serializer", StringSerializer.class.getName()); return new KafkaProducer<>(props); } diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index 77cef8d..7d8e199 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -28,7 +28,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener private final Map counterState = new HashMap<>(); private final String stateTopic; - Producer producer; + Producer producer; private volatile boolean running = false; private final Phaser phaser = new Phaser(1); @@ -43,7 +43,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener String topic, Consumer consumer, String stateTopic, - Producer producer) + Producer producer) { this.id = clientId; this.topic = topic; @@ -145,7 +145,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener void sendCounterState(int partition, String key, Long counter) { seen[partition]++; - ProducerRecord record = new ProducerRecord<>(stateTopic, key, counter); + ProducerRecord record = new ProducerRecord<>(stateTopic, key, counter.toString()); producer.send(record, ((metadata, exception) -> { if (exception == null) -- 2.20.1