From: Kai Moritz Date: Mon, 7 Apr 2025 20:37:35 +0000 (+0200) Subject: Der Consumer reicht Key/Value als `byte[]` durch X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=89e39a3fbe1e39011fdb6db5bbb04504e5fac48c;p=demos%2Fkafka%2Ftraining Der Consumer reicht Key/Value als `byte[]` durch --- diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 8b11dad..3feee95 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -2,6 +2,7 @@ package de.juplo.kafka; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.ConfigurableApplicationContext; @@ -17,7 +18,7 @@ public class ApplicationConfiguration { @Bean public DeadLetterConsumer deadLetterConsumer( - Consumer kafkaConsumer, + Consumer kafkaConsumer, ApplicationProperties properties, ConfigurableApplicationContext applicationContext) { @@ -31,15 +32,15 @@ public class ApplicationConfiguration } @Bean(destroyMethod = "") - public KafkaConsumer kafkaConsumer(ApplicationProperties properties) + public KafkaConsumer kafkaConsumer(ApplicationProperties properties) { Properties props = new Properties(); props.put("bootstrap.servers", properties.getBootstrapServer()); props.put("client.id", properties.getClientId()); props.put("group.id", properties.getConsumer().getGroupId()); - props.put("key.deserializer", StringDeserializer.class.getName()); - props.put("value.deserializer", StringDeserializer.class.getName()); + props.put("key.deserializer", ByteArrayDeserializer.class.getName()); + props.put("value.deserializer", ByteArrayDeserializer.class.getName()); props.put("auto.offset.reset", "none"); return new KafkaConsumer<>(props); diff --git a/src/main/java/de/juplo/kafka/DeadLetterConsumer.java b/src/main/java/de/juplo/kafka/DeadLetterConsumer.java index b8a32c5..a19cd3f 100644 --- a/src/main/java/de/juplo/kafka/DeadLetterConsumer.java +++ b/src/main/java/de/juplo/kafka/DeadLetterConsumer.java @@ -34,7 +34,7 @@ public class DeadLetterConsumer implements Runnable private final int numPartitions; private final Queue[] pendingFetchRequests; private final FetchRequest[] currentFetchRequest; - private final Consumer consumer; + private final Consumer consumer; private final Thread workerThread; private final Runnable closeCallback; @@ -45,7 +45,7 @@ public class DeadLetterConsumer implements Runnable String clientId, String topic, String headerPrefix, - Consumer consumer, + Consumer consumer, Runnable closeCallback) { this.id = clientId; @@ -88,13 +88,13 @@ public class DeadLetterConsumer implements Runnable { try { - ConsumerRecords records = consumer.poll(Duration.ofMinutes(1)); + ConsumerRecords records = consumer.poll(Duration.ofMinutes(1)); log.info("{} - Received {} messages", id, records.count()); List partitionsToPause = new LinkedList<>(); for (TopicPartition partition : records.partitions()) { - for (ConsumerRecord record : records.records(partition)) + for (ConsumerRecord record : records.records(partition)) { log.info( "{} - fetched partition={}-{}, offset={}: {}", @@ -209,14 +209,14 @@ public class DeadLetterConsumer implements Runnable currentFetchRequest[fetchRequest.partition().partition()] = fetchRequest; consumer.seek(fetchRequest.partition(), fetchRequest.offset()); } - Mono> requestRecord(int partition, long offset) + Mono> requestRecord(int partition, long offset) { if (partition >= numPartitions || partition < 0) { throw new NonExistentPartitionException(topic, partition); } - CompletableFuture> future = new CompletableFuture<>(); + CompletableFuture> future = new CompletableFuture<>(); FetchRequest fetchRequest = new FetchRequest( new TopicPartition(topic, partition), diff --git a/src/main/java/de/juplo/kafka/DeadLetterController.java b/src/main/java/de/juplo/kafka/DeadLetterController.java index c0d1392..6d84e84 100644 --- a/src/main/java/de/juplo/kafka/DeadLetterController.java +++ b/src/main/java/de/juplo/kafka/DeadLetterController.java @@ -29,7 +29,7 @@ public class DeadLetterController @GetMapping(path = "/{partition}/{offset}") - public Mono> recordAtOffset( + public Mono> recordAtOffset( @PathVariable int partition, @PathVariable long offset) { @@ -40,7 +40,7 @@ public class DeadLetterController .contentType(mediaType) .header( deadLetterConsumer.prefixed(DeadLetterConsumer.KEY), - UriUtils.encodePathSegment(record.key(), StandardCharsets.UTF_8)) + UriUtils.encodePathSegment(new String(record.key()), StandardCharsets.UTF_8)) .header( deadLetterConsumer.prefixed(DeadLetterConsumer.TIMESTAMP), Long.toString(record.timestamp())) diff --git a/src/main/java/de/juplo/kafka/FetchRequest.java b/src/main/java/de/juplo/kafka/FetchRequest.java index ff186b7..62028c3 100644 --- a/src/main/java/de/juplo/kafka/FetchRequest.java +++ b/src/main/java/de/juplo/kafka/FetchRequest.java @@ -9,7 +9,7 @@ import java.util.concurrent.CompletableFuture; public record FetchRequest( TopicPartition partition, long offset, - CompletableFuture> future) + CompletableFuture> future) { @Override public String toString()