import de.juplo.kafka.exceptions.NonExistentPartitionException;
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
+import org.apache.kafka.common.header.Header;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
{
return deadLetterConsumer
.requestRecord(partition, offset)
- .map(record -> ResponseEntity
- .ok()
- .contentType(mediaType)
- .header(
+ .map(record ->
+ {
+ ResponseEntity.BodyBuilder builder = ResponseEntity.ok().contentType(mediaType);
+
+ builder.header(
prefixed(KEY),
- UriUtils.encodePathSegment(new String(record.key()), StandardCharsets.UTF_8))
- .header(
+ UriUtils.encodePathSegment(new String(record.key()), StandardCharsets.UTF_8));
+ builder.header(
prefixed(TIMESTAMP),
- Long.toString(record.timestamp()))
- .body(record.value()));
+ Long.toString(record.timestamp()));
+
+ record.headers().forEach(header -> builder.header(
+ UriUtils.encodePathSegment(prefixed(header.key()), StandardCharsets.UTF_8),
+ UriUtils.encodePathSegment(new String(header.value(), StandardCharsets.UTF_8), StandardCharsets.UTF_8)));
+
+ return builder.body(record.value());
+ });
}
String prefixed(String headerName)