From 230dc066483a138e72317503ee66cda542f895e9 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Mon, 7 Apr 2025 22:37:35 +0200 Subject: [PATCH] Der Consumer reicht Key/Value als `byte[]` durch --- .../de/juplo/kafka/ApplicationConfiguration.java | 9 +++++---- src/main/java/de/juplo/kafka/DeadLetterConsumer.java | 12 ++++++------ .../java/de/juplo/kafka/DeadLetterController.java | 4 ++-- src/main/java/de/juplo/kafka/FetchRequest.java | 2 +- 4 files changed, 14 insertions(+), 13 deletions(-) diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index e3dfc9df..49a0b66f 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -2,6 +2,7 @@ package de.juplo.kafka; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.ConfigurableApplicationContext; @@ -17,7 +18,7 @@ public class ApplicationConfiguration { @Bean public DeadLetterConsumer deadLetterConsumer( - Consumer kafkaConsumer, + Consumer kafkaConsumer, ApplicationProperties properties, ConfigurableApplicationContext applicationContext) { @@ -30,15 +31,15 @@ public class ApplicationConfiguration } @Bean(destroyMethod = "") - public KafkaConsumer kafkaConsumer(ApplicationProperties properties) + public KafkaConsumer kafkaConsumer(ApplicationProperties properties) { Properties props = new Properties(); props.put("bootstrap.servers", properties.getBootstrapServer()); props.put("client.id", properties.getClientId()); props.put("group.id", properties.getConsumer().getGroupId()); - props.put("key.deserializer", StringDeserializer.class.getName()); - props.put("value.deserializer", StringDeserializer.class.getName()); + props.put("key.deserializer", ByteArrayDeserializer.class.getName()); + props.put("value.deserializer", ByteArrayDeserializer.class.getName()); props.put("enable.auto.commit", false); props.put("auto.offset.reset", "none"); diff --git a/src/main/java/de/juplo/kafka/DeadLetterConsumer.java b/src/main/java/de/juplo/kafka/DeadLetterConsumer.java index 1a6d1c93..47df7b2d 100644 --- a/src/main/java/de/juplo/kafka/DeadLetterConsumer.java +++ b/src/main/java/de/juplo/kafka/DeadLetterConsumer.java @@ -35,7 +35,7 @@ public class DeadLetterConsumer implements Runnable, SmartLifecycle private final int numPartitions; private final Queue[] pendingFetchRequests; private final FetchRequest[] currentFetchRequest; - private final Consumer consumer; + private final Consumer consumer; private Thread workerThread; private volatile boolean running = false; @@ -46,7 +46,7 @@ public class DeadLetterConsumer implements Runnable, SmartLifecycle String clientId, String topic, String headerPrefix, - Consumer consumer) + Consumer consumer) { this.id = clientId; this.topic = topic; @@ -96,13 +96,13 @@ public class DeadLetterConsumer implements Runnable, SmartLifecycle { try { - ConsumerRecords records = consumer.poll(Duration.ofMinutes(1)); + ConsumerRecords records = consumer.poll(Duration.ofMinutes(1)); log.info("{} - Received {} messages", id, records.count()); List partitionsToPause = new LinkedList<>(); for (TopicPartition partition : records.partitions()) { - for (ConsumerRecord record : records.records(partition)) + for (ConsumerRecord record : records.records(partition)) { log.info( "{} - fetched partition={}-{}, offset={}: {}", @@ -215,14 +215,14 @@ public class DeadLetterConsumer implements Runnable, SmartLifecycle currentFetchRequest[fetchRequest.partition().partition()] = fetchRequest; consumer.seek(fetchRequest.partition(), fetchRequest.offset()); } - Mono> requestRecord(int partition, long offset) + Mono> requestRecord(int partition, long offset) { if (partition >= numPartitions || partition < 0) { throw new NonExistentPartitionException(topic, partition); } - CompletableFuture> future = new CompletableFuture<>(); + CompletableFuture> future = new CompletableFuture<>(); FetchRequest fetchRequest = new FetchRequest( new TopicPartition(topic, partition), diff --git a/src/main/java/de/juplo/kafka/DeadLetterController.java b/src/main/java/de/juplo/kafka/DeadLetterController.java index c0d13925..6d84e84f 100644 --- a/src/main/java/de/juplo/kafka/DeadLetterController.java +++ b/src/main/java/de/juplo/kafka/DeadLetterController.java @@ -29,7 +29,7 @@ public class DeadLetterController @GetMapping(path = "/{partition}/{offset}") - public Mono> recordAtOffset( + public Mono> recordAtOffset( @PathVariable int partition, @PathVariable long offset) { @@ -40,7 +40,7 @@ public class DeadLetterController .contentType(mediaType) .header( deadLetterConsumer.prefixed(DeadLetterConsumer.KEY), - UriUtils.encodePathSegment(record.key(), StandardCharsets.UTF_8)) + UriUtils.encodePathSegment(new String(record.key()), StandardCharsets.UTF_8)) .header( deadLetterConsumer.prefixed(DeadLetterConsumer.TIMESTAMP), Long.toString(record.timestamp())) diff --git a/src/main/java/de/juplo/kafka/FetchRequest.java b/src/main/java/de/juplo/kafka/FetchRequest.java index ff186b73..62028c38 100644 --- a/src/main/java/de/juplo/kafka/FetchRequest.java +++ b/src/main/java/de/juplo/kafka/FetchRequest.java @@ -9,7 +9,7 @@ import java.util.concurrent.CompletableFuture; public record FetchRequest( TopicPartition partition, long offset, - CompletableFuture> future) + CompletableFuture> future) { @Override public String toString() -- 2.39.5