]> juplo.de Git - demos/kafka/training/commitdiff
ROT: Erwartungen für User-Header definiert
authorKai Moritz <kai@juplo.de>
Mon, 7 Apr 2025 21:43:02 +0000 (23:43 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 22 Mar 2026 20:47:38 +0000 (21:47 +0100)
src/test/java/de/juplo/kafka/ApplicationTests.java

index 51beb455fa144a3d4b208b7bc371e5a3c862a5f1..824c6142744844d0e4cb9dcc880486a36fe7be28 100644 (file)
@@ -4,6 +4,7 @@ import com.jayway.jsonpath.JsonPath;
 import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.admin.RecordsToDelete;
+import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.junit.jupiter.api.AfterEach;
@@ -27,7 +28,9 @@ import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.kafka.support.SendResult;
 import org.springframework.kafka.test.context.EmbeddedKafka;
 import org.springframework.test.annotation.DirtiesContext;
+import org.springframework.web.util.UriUtils;
 
+import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -119,6 +122,12 @@ public class ApplicationTests
         Long.toString(recordMetadata.timestamp()));
     assertThat(response.getBody())
       .isEqualTo(value);
+
+    result.getProducerRecord().headers().forEach(header ->
+      assertThat(response.getHeaders().toSingleValueMap())
+        .containsEntry(
+          UriUtils.encodePathSegment(HEADER_PREFIX + header.key(), StandardCharsets.UTF_8),
+          UriUtils.encodePathSegment(new String(header.value()), StandardCharsets.UTF_8)));
   }
 
   @DisplayName("Not yet existing offset")
@@ -213,8 +222,18 @@ public class ApplicationTests
   {
     String key = Integer.toString(partition);
     String value = "Hällö Wöhrld!%? -- " + ++message;
+    ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(
+      TOPIC,
+      partition,
+      key.getBytes(),
+      value.getBytes());
+    record
+      .headers()
+      .add("p%" + partition, "!§$%&/()ß#~üöä<;:|@€¢".getBytes())
+      .add(Integer.toString(message), "\"".getBytes())
+      .add("€", ":".getBytes());
     return kafkaTemplate
-      .send(TOPIC, partition, key.getBytes(), value.getBytes())
+      .send(record)
       .thenApply(result ->
       {
         RecordMetadata metadata = result.getRecordMetadata();