new DeadLetterConsumer(
properties.getClientId(),
properties.getConsumer().getTopic(),
- properties.getConsumer().getHeaderPrefix(),
kafkaConsumer,
() -> applicationContext.close());
}
@NotNull
@NotEmpty
private String topic;
- @NotNull
- @NotEmpty
- private String headerPrefix;
}
@Validated
@NotNull
@NotEmpty
private MediaType mediaType;
+ @NotNull
+ @NotEmpty
+ private String headerPrefix;
}
}
@Slf4j
public class DeadLetterConsumer implements Runnable
{
- public final static String KEY = "KEY";
- public final static String TIMESTAMP = "TIMESTAMP";
-
-
private final String id;
private final String topic;
- private final String headerPrefix;
private final int numPartitions;
private final Queue<FetchRequest>[] pendingFetchRequests;
private final FetchRequest[] currentFetchRequest;
public DeadLetterConsumer(
String clientId,
String topic,
- String headerPrefix,
Consumer<byte[], byte[]> consumer,
Runnable closeCallback)
{
this.id = clientId;
this.topic = topic;
- this.headerPrefix = headerPrefix;
this.consumer = consumer;
numPartitions = consumer.partitionsFor(topic).size();
return Mono.fromFuture(future);
}
- String prefixed(String headerName)
- {
- return headerPrefix + headerName;
- }
-
public void shutdown() throws InterruptedException
{
log.info("{} - Requesting shutdown", id);
@RestController
public class DeadLetterController
{
+ public final static String KEY = "KEY";
+ public final static String TIMESTAMP = "TIMESTAMP";
+
+
private final DeadLetterConsumer deadLetterConsumer;
private final MediaType mediaType;
+ private final String headerPrefix;
+
public DeadLetterController(
{
this.deadLetterConsumer = deadLetterConsumer;
this.mediaType = properties.getController().getMediaType();
+ this.headerPrefix = properties.getController().getHeaderPrefix();
}
.ok()
.contentType(mediaType)
.header(
- deadLetterConsumer.prefixed(DeadLetterConsumer.KEY),
+ prefixed(KEY),
UriUtils.encodePathSegment(new String(record.key()), StandardCharsets.UTF_8))
.header(
- deadLetterConsumer.prefixed(DeadLetterConsumer.TIMESTAMP),
+ prefixed(TIMESTAMP),
Long.toString(record.timestamp()))
.body(record.value()));
}
+ String prefixed(String headerName)
+ {
+ return headerPrefix + headerName;
+ }
+
+
@ResponseStatus(value= HttpStatus.NOT_FOUND)
@ExceptionHandler(OffsetOutOfRangeException.class)
public void notFound(OffsetOutOfRangeException e)
consumer:
group-id: nodlt
topic: test
- header-prefix: X-RECORD-
controller:
+ header-prefix: X-RECORD-
media-type: application/json
management:
endpoint:
properties = {
"juplo.bootstrap-server=${spring.embedded.kafka.brokers}",
"juplo.consumer.topic=" + TOPIC,
- "juplo.consumer.header-prefix=" + HEADER_PREFIX,
+ "juplo.controller.header-prefix=" + HEADER_PREFIX,
"spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.ByteArraySerializer",
"spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.ByteArraySerializer",
"logging.level.de.juplo.kafka=TRACE",
.isEqualTo(HttpStatusCode.valueOf(HttpStatus.OK.value()));
assertThat(response.getHeaders())
.containsEntry(
- HEADER_PREFIX + DeadLetterConsumer.KEY,
+ HEADER_PREFIX + DeadLetterController.KEY,
List.of(key));
assertThat(response.getHeaders())
.containsEntry(
- HEADER_PREFIX + DeadLetterConsumer.TIMESTAMP,
+ HEADER_PREFIX + DeadLetterController.TIMESTAMP,
List.of(Long.toString(recordMetadata.timestamp())));
assertThat(response.getBody())
.isEqualTo(value);