import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.http.MediaType;
import org.springframework.validation.annotation.Validated;
private String clientId;
@NotNull
- private ConsumerProperties consumer;
+ private ApplicationProperties.Consumer consumer;
+ @NotNull
+ private ApplicationProperties.Controller controller;
@Validated
@Getter
@Setter
- static class ConsumerProperties
+ static class Consumer
{
@NotNull
@NotEmpty
@NotEmpty
private String headerPrefix;
}
+
+ @Validated
+ @Getter
+ @Setter
+ static class Controller
+ {
+ @NotNull
+ @NotEmpty
+ private MediaType mediaType;
+ }
}
import de.juplo.kafka.exceptions.NonExistentPartitionException;
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
-import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
+import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.util.UriUtils;
@RestController
public class DeadLetterController
{
- @Autowired
- DeadLetterConsumer deadLetterConsumer;
+ private final DeadLetterConsumer deadLetterConsumer;
+ private final MediaType mediaType;
+
+
+ public DeadLetterController(
+ DeadLetterConsumer deadLetterConsumer,
+ ApplicationProperties properties)
+ {
+ this.deadLetterConsumer = deadLetterConsumer;
+ this.mediaType = properties.getController().getMediaType();
+ }
@GetMapping(path = "/{partition}/{offset}")
.requestRecord(partition, offset)
.map(record -> ResponseEntity
.ok()
+ .contentType(mediaType)
.header(
deadLetterConsumer.prefixed(DeadLetterConsumer.KEY),
UriUtils.encodePathSegment(record.key(), StandardCharsets.UTF_8))