import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.RecordsToDelete;
+import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.Header;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.kafka.test.context.EmbeddedKafka;
+import org.springframework.web.util.UriUtils;
+import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
List.of(Long.toString(recordMetadata.timestamp())));
assertThat(response.getBody())
.isEqualTo(value);
+
+ result.getProducerRecord().headers().forEach(header -> assertThat(response.getHeaders())
+ .containsEntry(
+ UriUtils.encodePathSegment(HEADER_PREFIX + header.key(), StandardCharsets.UTF_8),
+ List.of(UriUtils.encodePathSegment(new String(header.value()), StandardCharsets.UTF_8))));
}
@DisplayName("Not yet existing offset")
{
String key = Integer.toString(partition);
String value = "Hällö Wöhrld!%? -- " + ++message;
+ ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(
+ TOPIC,
+ partition,
+ key.getBytes(),
+ value.getBytes());
+ record
+ .headers()
+ .add("p%" + partition, "!§$%&/()ß#~üöä<;:|@€¢".getBytes())
+ .add(Integer.toString(message), "\"".getBytes())
+ .add("€", ":".getBytes());
return kafkaTemplate
- .send(TOPIC, partition, key.getBytes(), value.getBytes())
+ .send(record)
.thenApply(result ->
{
RecordMetadata metadata = result.getRecordMetadata();