From: Kai Moritz Date: Mon, 7 Apr 2025 21:43:02 +0000 (+0200) Subject: ROT: Erwartungen für User-Header definiert X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=9107d75fb5d39591974b980e7be3060fbfae5d8c;p=demos%2Fkafka%2Ftraining ROT: Erwartungen für User-Header definiert --- diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 1545131..834f85f 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -4,8 +4,10 @@ import com.jayway.jsonpath.JsonPath; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.RecordsToDelete; +import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.header.Header; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; @@ -25,7 +27,9 @@ import org.springframework.http.ResponseEntity; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.web.util.UriUtils; +import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -115,6 +119,11 @@ public class ApplicationTests List.of(Long.toString(recordMetadata.timestamp()))); assertThat(response.getBody()) .isEqualTo(value); + + result.getProducerRecord().headers().forEach(header -> assertThat(response.getHeaders()) + .containsEntry( + UriUtils.encodePathSegment(HEADER_PREFIX + header.key(), StandardCharsets.UTF_8), + List.of(UriUtils.encodePathSegment(new String(header.value()), StandardCharsets.UTF_8)))); } @DisplayName("Not yet existing offset") @@ -209,8 +218,18 @@ public class ApplicationTests { String key = Integer.toString(partition); String value = "Hällö Wöhrld!%? -- " + ++message; + ProducerRecord record = new ProducerRecord<>( + TOPIC, + partition, + key.getBytes(), + value.getBytes()); + record + .headers() + .add("p%" + partition, "!§$%&/()ß#~üöä<;:|@€¢".getBytes()) + .add(Integer.toString(message), "\"".getBytes()) + .add("€", ":".getBytes()); return kafkaTemplate - .send(TOPIC, partition, key.getBytes(), value.getBytes()) + .send(record) .thenApply(result -> { RecordMetadata metadata = result.getRecordMetadata();