From: Kai Moritz Date: Mon, 28 Oct 2024 07:00:48 +0000 (+0100) Subject: Der Value-Typ in dem Topic `state` ist jetzt auch vom Typ `String` X-Git-Tag: consumer/spring-consumer--log-compaction--2024-11-13--si~22 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=c6d1ded087826ef257b54c69fb7c654b21d55a81;p=demos%2Fkafka%2Ftraining Der Value-Typ in dem Topic `state` ist jetzt auch vom Typ `String` * Dadurch wird die Kontrolle der Ergebnisse einfacher, da alle Nachrichten auch einfach mit `kafkacat` gelesen werden können. --- diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index c1fe03a..e454ed0 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -5,7 +5,6 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.StickyAssignor; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.boot.context.properties.EnableConfigurationProperties; @@ -23,7 +22,7 @@ public class ApplicationConfiguration @Bean public ExampleConsumer exampleConsumer( Consumer kafkaConsumer, - Producer kafkaProducer, + Producer kafkaProducer, ApplicationProperties properties, ConfigurableApplicationContext applicationContext) { @@ -61,7 +60,7 @@ public class ApplicationConfiguration } @Bean - public KafkaProducer kafkaProducer(ApplicationProperties properties) + public KafkaProducer kafkaProducer(ApplicationProperties properties) { Properties props = new Properties(); props.put("bootstrap.servers", properties.getBootstrapServer()); @@ -74,7 +73,7 @@ public class ApplicationConfiguration props.put("linger.ms", properties.getProducerProperties().getLingerMs()); props.put("compression.type", properties.getProducerProperties().getCompressionType()); props.put("key.serializer", StringSerializer.class.getName()); - props.put("value.serializer", LongSerializer.class.getName()); + props.put("value.serializer", StringSerializer.class.getName()); return new KafkaProducer<>(props); } diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index da845bd..aa489c4 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -29,7 +29,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener private final Map counterState = new HashMap<>(); private final String stateTopic; - private final Producer producer; + private final Producer producer; private volatile boolean running = false; private final Phaser phaser = new Phaser(1); @@ -44,7 +44,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener String topic, Consumer consumer, String stateTopic, - Producer producer, + Producer producer, Runnable closeCallback) { this.id = clientId; @@ -153,7 +153,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener void sendCounterState(int partition, String key, Long counter) { seen[partition]++; - ProducerRecord record = new ProducerRecord<>(stateTopic, key, counter); + ProducerRecord record = new ProducerRecord<>(stateTopic, key, counter.toString()); producer.send(record, ((metadata, exception) -> { if (exception == null)