public class ApplicationConfiguration
{
@Bean
- public DeadLetterConsumer exampleConsumer(
+ public DeadLetterConsumer deadLetterConsumer(
Consumer<String, String> kafkaConsumer,
ApplicationProperties properties,
ConfigurableApplicationContext applicationContext)
public KafkaConsumer<String, String> kafkaConsumer(ApplicationProperties properties)
{
Properties props = new Properties();
+
props.put("bootstrap.servers", properties.getBootstrapServer());
props.put("client.id", properties.getClientId());
props.put("group.id", properties.getConsumerProperties().getGroupId());
- if (properties.getConsumerProperties().getAutoOffsetReset() != null)
- {
- props.put("auto.offset.reset", properties.getConsumerProperties().getAutoOffsetReset().name());
- }
- if (properties.getConsumerProperties().getAutoCommitInterval() != null)
- {
- props.put("auto.commit.interval", properties.getConsumerProperties().getAutoCommitInterval());
- }
- props.put("metadata.max.age.ms", 5000); // 5 Sekunden
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
+ props.put("auto.offset.reset", "none");
return new KafkaConsumer<>(props);
}
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
+import reactor.core.publisher.Mono;
import java.time.Duration;
-import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
@Slf4j
{
private final String id;
private final String topic;
+ private final int numPartitions;
+ private final Queue<FetchRequest>[] pendingFetchRequests;
+ private final FetchRequest[] currentFetchRequest;
private final Consumer<String, String> consumer;
private final Thread workerThread;
private final Runnable closeCallback;
- private long consumed = 0;
+ private boolean shutdownIsRequested = false;
public DeadLetterConsumer(
this.topic = topic;
this.consumer = consumer;
- workerThread = new Thread(this, "ExampleConsumer Worker-Thread");
+ numPartitions = consumer.partitionsFor(topic).size();
+ pendingFetchRequests = IntStream
+ .range(0, numPartitions)
+ .mapToObj(info -> new ConcurrentLinkedQueue<String>())
+ .toArray(size -> new Queue[size]);
+ currentFetchRequest = new FetchRequest[numPartitions];
+
+ workerThread = new Thread(this, clientId + "-worker-thread");
workerThread.start();
this.closeCallback = closeCallback;
{
try
{
- log.info("{} - Subscribing to topic {}", id, topic);
- consumer.subscribe(Arrays.asList(topic));
+ List<TopicPartition> partitions = IntStream
+ .range(0, pendingFetchRequests.length)
+ .mapToObj(i -> new TopicPartition(topic, i))
+ .peek(partition -> log.info("{} - Assigning to {}", id, partition))
+ .collect(Collectors.toList());
- while (true)
- {
- ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
+ consumer.assign(partitions);
+ consumer.pause(partitions);
- log.info("{} - Received {} messages", id, records.count());
- for (ConsumerRecord<String, String> record : records)
+ // Without this, the first call to poll() triggers an NoOffsetForPartitionException, if the topic is empty
+ partitions.forEach(partition -> consumer.seek(partition, 0));
+
+ while (!shutdownIsRequested)
+ {
+ try
{
- handleRecord(
- record.topic(),
- record.partition(),
- record.offset(),
- record.key(),
- record.value());
+ ConsumerRecords<String, String> records = consumer.poll(Duration.ofMinutes(1));
+
+ log.info("{} - Received {} messages", id, records.count());
+ List<TopicPartition> partitionsToPause = new LinkedList<>();
+ for (TopicPartition partition : records.partitions())
+ {
+ for (ConsumerRecord<String, String> record : records.records(partition))
+ {
+ log.info(
+ "{} - fetched partition={}-{}, offset={}: {}",
+ id,
+ topic,
+ record.partition(),
+ record.offset(),
+ record.key());
+
+ FetchRequest fetchRequest = currentFetchRequest[record.partition()];
+
+ fetchRequest.future().complete(record.value());
+ schedulePendingFetchRequest(record.partition()).ifPresentOrElse(
+ (nextFetchRequest) ->
+ {
+ scheduleFetchRequest(nextFetchRequest);
+ },
+ () ->
+ {
+ log.info("{} - no pending fetch-requests for {}", id, partition);
+ currentFetchRequest[record.partition()] = null;
+ partitionsToPause.add(fetchRequest.partition());
+ });
+
+ break;
+ }
+ }
+
+ consumer.pause(partitionsToPause);
+ }
+ catch(WakeupException e)
+ {
+ log.info("{} - Consumer was awakened", id);
+
+ List<TopicPartition> partitionsToResume = new LinkedList<>();
+
+ for (int partition = 0; partition < numPartitions; partition++)
+ {
+ schedulePendingFetchRequest(partition)
+ .map(fetchRequest -> fetchRequest.partition())
+ .ifPresent(topicPartition -> partitionsToResume.add(topicPartition));
+ }
+
+ consumer.resume(partitionsToResume);
}
}
}
- catch(WakeupException e)
- {
- log.info("{} - Consumer was signaled to finish its work", id);
- }
catch(Exception e)
{
log.error("{} - Unexpected error, unsubscribing!", id, e);
{
log.info("{} - Closing the KafkaConsumer", id);
consumer.close();
- log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
+ log.info("{} - Exiting!", id);
}
}
- private void handleRecord(
- String topic,
- Integer partition,
- Long offset,
- String key,
- String value)
+ private Optional<FetchRequest> schedulePendingFetchRequest(int partition)
{
- consumed++;
- log.info("{} - partition={}-{}, offset={}: {}={}", id, topic, partition, offset, key, value);
+ if (currentFetchRequest[partition] == null)
+ {
+ FetchRequest nextFetchRequest = pendingFetchRequests[partition].poll();
+ if (nextFetchRequest != null)
+ {
+ scheduleFetchRequest(nextFetchRequest);
+ return Optional.of(nextFetchRequest);
+ }
+ else
+ {
+ log.trace("{} - no pending fetch-request for partition {}.", id, partition);
+ }
+ }
+ else
+ {
+ log.debug("{} - fetch-request {} is still in progress.", id, currentFetchRequest[partition]);
+ }
+
+ return Optional.empty();
}
+ private void scheduleFetchRequest(FetchRequest fetchRequest)
+ {
+ log.debug("{} - scheduling fetch-request {}.", id, fetchRequest);
+
+ currentFetchRequest[fetchRequest.partition().partition()] = fetchRequest;
+ consumer.seek(fetchRequest.partition(), fetchRequest.offset());
+ }
+ Mono<String> requestRecord(int partition, long offset)
+ {
+ CompletableFuture<String> future = new CompletableFuture<>();
+
+ FetchRequest fetchRequest = new FetchRequest(
+ new TopicPartition(topic, partition),
+ offset,
+ future);
+
+ pendingFetchRequests[partition].add(fetchRequest);
+
+ log.info(
+ "{} - fetch-request for partition={}, offset={}: Waking up consumer!",
+ id,
+ partition,
+ offset);
+ consumer.wakeup();
+
+ return Mono.fromFuture(future);
+ }
public void shutdown() throws InterruptedException
{
- log.info("{} - Waking up the consumer", id);
+ log.info("{} - Requesting shutdown", id);
+ shutdownIsRequested = true;
consumer.wakeup();
log.info("{} - Joining the worker thread", id);
workerThread.join();