return
new DeadLetterConsumer(
properties.getClientId(),
- properties.getConsumerProperties().getTopic(),
+ properties.getConsumer().getTopic(),
+ properties.getConsumer().getHeaderPrefix(),
kafkaConsumer,
() -> applicationContext.close());
}
props.put("bootstrap.servers", properties.getBootstrapServer());
props.put("client.id", properties.getClientId());
- props.put("group.id", properties.getConsumerProperties().getGroupId());
+ props.put("group.id", properties.getConsumer().getGroupId());
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
props.put("auto.offset.reset", "none");
private ConsumerProperties consumer;
- public ConsumerProperties getConsumerProperties()
- {
- return consumer;
- }
-
-
@Validated
@Getter
@Setter
@NotNull
@NotEmpty
private String topic;
+ @NotNull
+ @NotEmpty
+ private String headerPrefix;
}
}
private final String id;
private final String topic;
+ private final String headerPrefix;
private final int numPartitions;
private final Queue<FetchRequest>[] pendingFetchRequests;
private final FetchRequest[] currentFetchRequest;
public DeadLetterConsumer(
String clientId,
String topic,
+ String headerPrefix,
Consumer<String, String> consumer,
Runnable closeCallback)
{
this.id = clientId;
this.topic = topic;
+ this.headerPrefix = headerPrefix;
this.consumer = consumer;
numPartitions = consumer.partitionsFor(topic).size();
FetchRequest fetchRequest = currentFetchRequest[record.partition()];
- fetchRequest.future().complete(record.value());
+ fetchRequest.future().complete(record);
schedulePendingFetchRequest(record.partition()).ifPresentOrElse(
(nextFetchRequest) -> scheduleFetchRequest(nextFetchRequest),
() ->
currentFetchRequest[fetchRequest.partition().partition()] = fetchRequest;
consumer.seek(fetchRequest.partition(), fetchRequest.offset());
}
- Mono<String> requestRecord(int partition, long offset)
+ Mono<ConsumerRecord<String, String>> requestRecord(int partition, long offset)
{
if (partition >= numPartitions || partition < 0)
{
throw new NonExistentPartitionException(topic, partition);
}
- CompletableFuture<String> future = new CompletableFuture<>();
+ CompletableFuture<ConsumerRecord<String, String>> future = new CompletableFuture<>();
FetchRequest fetchRequest = new FetchRequest(
new TopicPartition(topic, partition),
String prefixed(String headerName)
{
- return headerName;
+ return headerPrefix + headerName;
}
public void shutdown() throws InterruptedException
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
+import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
+import org.springframework.web.util.UriUtils;
import reactor.core.publisher.Mono;
+import java.nio.charset.StandardCharsets;
+
@RestController
public class DeadLetterController
@GetMapping(path = "/{partition}/{offset}")
- public Mono<String> recordAtOffset(
+ public Mono<ResponseEntity<String>> recordAtOffset(
@PathVariable int partition,
@PathVariable long offset)
{
- return deadLetterConsumer.requestRecord(partition, offset);
+ return deadLetterConsumer
+ .requestRecord(partition, offset)
+ .map(record -> ResponseEntity
+ .ok()
+ .header(
+ deadLetterConsumer.prefixed(DeadLetterConsumer.KEY),
+ UriUtils.encodePathSegment(record.key(), StandardCharsets.UTF_8))
+ .header(
+ deadLetterConsumer.prefixed(DeadLetterConsumer.TIMESTAMP),
+ Long.toString(record.timestamp()))
+ .body(record.value()));
}
@ResponseStatus(value= HttpStatus.NOT_FOUND)
package de.juplo.kafka;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import java.util.concurrent.CompletableFuture;
public record FetchRequest(
TopicPartition partition,
long offset,
- CompletableFuture<String> future)
+ CompletableFuture<ConsumerRecord<String, String>> future)
{
@Override
public String toString()
consumer:
group-id: nodlt
topic: test
+ header-prefix: X-RECORD-
management:
endpoint:
shutdown: