From: Kai Moritz Date: Mon, 7 Apr 2025 21:43:02 +0000 (+0200) Subject: ROT: Erwartungen für User-Header definiert X-Git-Tag: consumer/nodlt--2026-03-20--19-06~3 X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=55fbe9538df07e8b3c041c43f8484cc67e4e762b;p=demos%2Fkafka%2Ftraining ROT: Erwartungen für User-Header definiert --- diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 6c07c061..01a025f9 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -4,8 +4,10 @@ import com.jayway.jsonpath.JsonPath; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.RecordsToDelete; +import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.header.Header; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; @@ -25,7 +27,9 @@ import org.springframework.http.ResponseEntity; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.web.util.UriUtils; +import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -111,6 +115,15 @@ public class ApplicationTests .containsExactly(Long.toString(recordMetadata.timestamp())); assertThat(response.getBody()) .isEqualTo(value); + + result.getProducerRecord().headers().forEach(header -> + assertThat(response.getHeaders().getValuesAsList(encode(HEADER_PREFIX + header.key()))) + .containsExactly(encode(new String(header.value())))); + } + + private static String encode(String value) + { + return UriUtils.encodePathSegment(value, StandardCharsets.UTF_8); } @DisplayName("Not yet existing offset") @@ -205,8 +218,18 @@ public class ApplicationTests { String key = Integer.toString(partition); String value = "Hällö Wöhrld!%? -- " + ++message; + ProducerRecord record = new ProducerRecord<>( + TOPIC, + partition, + key.getBytes(), + value.getBytes()); + record + .headers() + .add("p%" + partition, "!§$%&/()ß#~üöä<;:|@€¢".getBytes()) + .add(Integer.toString(message), "\"".getBytes()) + .add("€", ":".getBytes()); return kafkaTemplate - .send(TOPIC, partition, key.getBytes(), value.getBytes()) + .send(record) .thenApply(result -> { RecordMetadata metadata = result.getRecordMetadata();