ROT: Erwartungen für User-Header definiert
authorKai Moritz <kai@juplo.de>
Mon, 7 Apr 2025 21:43:02 +0000 (23:43 +0200)
committerKai Moritz <kai@juplo.de>
Wed, 21 May 2025 18:14:13 +0000 (20:14 +0200)
src/test/java/de/juplo/kafka/ApplicationTests.java

index 1545131..834f85f 100644 (file)
@@ -4,8 +4,10 @@ import com.jayway.jsonpath.JsonPath;
 import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.admin.RecordsToDelete;
+import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.Header;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.DisplayName;
@@ -25,7 +27,9 @@ import org.springframework.http.ResponseEntity;
 import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.kafka.support.SendResult;
 import org.springframework.kafka.test.context.EmbeddedKafka;
+import org.springframework.web.util.UriUtils;
 
+import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -115,6 +119,11 @@ public class ApplicationTests
         List.of(Long.toString(recordMetadata.timestamp())));
     assertThat(response.getBody())
       .isEqualTo(value);
+
+    result.getProducerRecord().headers().forEach(header -> assertThat(response.getHeaders())
+        .containsEntry(
+          UriUtils.encodePathSegment(HEADER_PREFIX + header.key(), StandardCharsets.UTF_8),
+          List.of(UriUtils.encodePathSegment(new String(header.value()), StandardCharsets.UTF_8))));
   }
 
   @DisplayName("Not yet existing offset")
@@ -209,8 +218,18 @@ public class ApplicationTests
   {
     String key = Integer.toString(partition);
     String value = "Hällö Wöhrld!%? -- " + ++message;
+    ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(
+      TOPIC,
+      partition,
+      key.getBytes(),
+      value.getBytes());
+    record
+      .headers()
+      .add("p%" + partition, "!§$%&/()ß#~üöä<;:|@€¢".getBytes())
+      .add(Integer.toString(message), "\"".getBytes())
+      .add("€", ":".getBytes());
     return kafkaTemplate
-      .send(TOPIC, partition, key.getBytes(), value.getBytes())
+      .send(record)
       .thenApply(result ->
       {
         RecordMetadata metadata = result.getRecordMetadata();