Der Consumer reicht Key/Value als `byte[]` durch
authorKai Moritz <kai@juplo.de>
Mon, 7 Apr 2025 20:37:35 +0000 (22:37 +0200)
committerKai Moritz <kai@juplo.de>
Wed, 21 May 2025 18:14:13 +0000 (20:14 +0200)
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/DeadLetterConsumer.java
src/main/java/de/juplo/kafka/DeadLetterController.java
src/main/java/de/juplo/kafka/FetchRequest.java

index 8b11dad..3feee95 100644 (file)
@@ -2,6 +2,7 @@ package de.juplo.kafka;
 
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.context.ConfigurableApplicationContext;
@@ -17,7 +18,7 @@ public class ApplicationConfiguration
 {
   @Bean
   public DeadLetterConsumer deadLetterConsumer(
-    Consumer<String, String> kafkaConsumer,
+    Consumer<byte[], byte[]> kafkaConsumer,
     ApplicationProperties properties,
     ConfigurableApplicationContext applicationContext)
   {
@@ -31,15 +32,15 @@ public class ApplicationConfiguration
   }
 
   @Bean(destroyMethod = "")
-  public KafkaConsumer<String, String> kafkaConsumer(ApplicationProperties properties)
+  public KafkaConsumer<byte[], byte[]> kafkaConsumer(ApplicationProperties properties)
   {
     Properties props = new Properties();
 
     props.put("bootstrap.servers", properties.getBootstrapServer());
     props.put("client.id", properties.getClientId());
     props.put("group.id", properties.getConsumer().getGroupId());
-    props.put("key.deserializer", StringDeserializer.class.getName());
-    props.put("value.deserializer", StringDeserializer.class.getName());
+    props.put("key.deserializer", ByteArrayDeserializer.class.getName());
+    props.put("value.deserializer", ByteArrayDeserializer.class.getName());
     props.put("auto.offset.reset", "none");
 
     return new KafkaConsumer<>(props);
index b8a32c5..a19cd3f 100644 (file)
@@ -34,7 +34,7 @@ public class DeadLetterConsumer implements Runnable
   private final int numPartitions;
   private final Queue<FetchRequest>[] pendingFetchRequests;
   private final FetchRequest[] currentFetchRequest;
-  private final Consumer<String, String> consumer;
+  private final Consumer<byte[], byte[]> consumer;
   private final Thread workerThread;
   private final Runnable closeCallback;
 
@@ -45,7 +45,7 @@ public class DeadLetterConsumer implements Runnable
     String clientId,
     String topic,
     String headerPrefix,
-    Consumer<String, String> consumer,
+    Consumer<byte[], byte[]> consumer,
     Runnable closeCallback)
   {
     this.id = clientId;
@@ -88,13 +88,13 @@ public class DeadLetterConsumer implements Runnable
       {
         try
         {
-          ConsumerRecords<String, String> records = consumer.poll(Duration.ofMinutes(1));
+          ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMinutes(1));
 
           log.info("{} - Received {} messages", id, records.count());
           List<TopicPartition> partitionsToPause = new LinkedList<>();
           for (TopicPartition partition : records.partitions())
           {
-            for (ConsumerRecord<String, String> record : records.records(partition))
+            for (ConsumerRecord<byte[], byte[]> record : records.records(partition))
             {
               log.info(
                 "{} - fetched partition={}-{}, offset={}: {}",
@@ -209,14 +209,14 @@ public class DeadLetterConsumer implements Runnable
     currentFetchRequest[fetchRequest.partition().partition()] = fetchRequest;
     consumer.seek(fetchRequest.partition(), fetchRequest.offset());
   }
-  Mono<ConsumerRecord<String, String>> requestRecord(int partition, long offset)
+  Mono<ConsumerRecord<byte[], byte[]>> requestRecord(int partition, long offset)
   {
     if (partition >= numPartitions || partition < 0)
     {
       throw new NonExistentPartitionException(topic, partition);
     }
 
-    CompletableFuture<ConsumerRecord<String, String>> future = new CompletableFuture<>();
+    CompletableFuture<ConsumerRecord<byte[], byte[]>> future = new CompletableFuture<>();
 
     FetchRequest fetchRequest = new FetchRequest(
       new TopicPartition(topic, partition),
index c0d1392..6d84e84 100644 (file)
@@ -29,7 +29,7 @@ public class DeadLetterController
 
 
   @GetMapping(path = "/{partition}/{offset}")
-  public Mono<ResponseEntity<String>> recordAtOffset(
+  public Mono<ResponseEntity<byte[]>> recordAtOffset(
     @PathVariable int partition,
     @PathVariable long offset)
   {
@@ -40,7 +40,7 @@ public class DeadLetterController
         .contentType(mediaType)
         .header(
           deadLetterConsumer.prefixed(DeadLetterConsumer.KEY),
-          UriUtils.encodePathSegment(record.key(), StandardCharsets.UTF_8))
+          UriUtils.encodePathSegment(new String(record.key()), StandardCharsets.UTF_8))
         .header(
           deadLetterConsumer.prefixed(DeadLetterConsumer.TIMESTAMP),
           Long.toString(record.timestamp()))
index ff186b7..62028c3 100644 (file)
@@ -9,7 +9,7 @@ import java.util.concurrent.CompletableFuture;
 public record FetchRequest(
   TopicPartition partition,
   long offset,
-  CompletableFuture<ConsumerRecord<String, String>> future)
+  CompletableFuture<ConsumerRecord<byte[], byte[]>> future)
 {
   @Override
   public String toString()