import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.payment.transfer.domain.Transfer;
import de.juplo.kafka.payment.transfer.ports.CreateTransferUseCase;
import de.juplo.kafka.payment.transfer.ports.GetTransferUseCase;
import de.juplo.kafka.payment.transfer.ports.HandleStateChangeUseCase;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
+import java.time.Clock;
import java.time.Duration;
+import java.time.Instant;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
-import java.util.stream.Collectors;
+import java.util.function.Consumer;
@RequestMapping("/consumer")
private final AdminClient adminClient;
private final TransferRepository repository;
private final ObjectMapper mapper;
- private final ConsumerUseCases productionUseCases, restoreUseCases;
+ private final ConsumerUseCases restoreUseCases;
private boolean running = false;
private boolean shutdown = false;
private final Map<String, String> instanceIdUriMapping;
private final String[] instanceIdByPartition;
+
+ private Clock clock;
+ private int stateStoreInterval;
+
+ private final Consumer<ConsumerRecord<String, String>> productionRecordHandler;
+ private final Consumer<ConsumerRecord<String, String>> recordHandlers[];
+
private volatile boolean partitionOwnershipUnknown = true;
KafkaConsumer<String, String> consumer,
AdminClient adminClient,
TransferRepository repository,
+ Clock clock,
+ int stateStoreInterval,
ObjectMapper mapper,
ConsumerUseCases productionUseCases,
ConsumerUseCases restoreUseCases)
this.consumer = consumer;
this.adminClient = adminClient;
this.repository = repository;
+ this.clock = clock;
+ this.stateStoreInterval = stateStoreInterval;
this.mapper = mapper;
- this.productionUseCases = productionUseCases;
this.restoreUseCases = restoreUseCases;
+
+ productionRecordHandler = (record) -> handleRecord(record, productionUseCases);
+ this.recordHandlers = new Consumer[numPartitions];
}
@Override
public void run()
{
+ Instant stateStored = clock.instant();
+
while (running)
{
try
continue;
log.debug("polled {} records", records.count());
- records.forEach(record -> handleRecord(record, productionUseCases));
+ records.forEach(record -> recordHandlers[record.partition()].accept(record));
+
+ Instant now = clock.instant();
+ if (
+ stateStoreInterval > 0 &&
+ Duration.between(stateStored, now).getSeconds() >= stateStoreInterval)
+ {
+ Map<Integer, Long> offsets = new HashMap<>();
+
+ for (TopicPartition topicPartition : consumer.assignment())
+ {
+ Integer partition = topicPartition.partition();
+ Long offset = consumer.position(topicPartition);
+ log.info("storing state locally for {}/{}: {}", topic, partition, offset);
+ offsets.put(partition, offset);
+ }
+
+ repository.storeState(offsets);
+ stateStored = now;
+ }
}
catch (WakeupException e)
{
}
+ /**
+ * Identifies the URI, at which the Group-Instance can be reached,
+ * that holds the state for a specific {@link Transfer}.
+ *
+ * The {@link Transfer#getId() ID} of the {@link Transfer} is named
+ * {@code key} here and of type {@code String}, because this example
+ * project stores the key as a String in Kafka to simplify the listing
+ * and manual manipulation of the according topic.
+ *
+ * @param key A {@code String}, that represents the {@link Transfer#getId() ID} of a {@link Transfer}.
+ * @return An {@link Optional}, that holds the URI at which the Group-Instance
+ * can be reached, that holds the state for the {@link Transfer}, that
+ * is identified by the key (if present), or is empty, if the {@link Transfer}
+ * would be handled by the local instance.
+ */
public Optional<String> uriForKey(String key)
{
synchronized (this)
// Hence, the app would not react to any signal (CTRL-C, for example) except
// a KILL until the restoring is finished.
future = CompletableFuture.runAsync(() -> start());
+ log.info("start of application completed");
}
{
partitionOwnershipUnknown = true;
log.info("partitions revoked: {}", partitions);
+ for (TopicPartition topicPartition : partitions)
+ {
+ int partition = topicPartition.partition();
+ long offset = consumer.position(topicPartition);
+ log.info("deactivating partition {}, offset: {}", partition, offset);
+ repository.deactivatePartition(partition, offset);
+ }
}
@Override
log.info("partitions assigned: {}", partitions);
fetchAssignmentsAsync();
if (partitions.size() > 0)
- restore(partitions);
+ {
+ for (Map.Entry<TopicPartition, Long> entry : consumer.endOffsets(partitions).entrySet())
+ {
+ TopicPartition topicPartition = entry.getKey();
+ Integer partition = topicPartition.partition();
+ long offset = repository.activatePartition(partition);
+ log.info("activated partition {}, seeking to offset {}", partition, offset);
+ consumer.seek(topicPartition, offset);
+ Long endOffset = entry.getValue();
+ if (offset < endOffset)
+ {
+ log.info("--> starting restore of partition {}: {} -> {}", partition, offset, endOffset);
+ recordHandlers[partition] = new RestoreRecordHandler(endOffset);
+ }
+ else
+ {
+ log.info("--> partition {} is up-to-date, offset: {}", partition, offset);
+ recordHandlers[partition] = productionRecordHandler;
+ }
+ }
+ }
}
private void fetchAssignmentsAsync()
}
- private void restore(Collection<TopicPartition> partitions)
+ class RestoreRecordHandler implements Consumer<ConsumerRecord<String, String>>
{
- log.info("--> starting restore...");
-
- partitions
- .stream()
- .map(topicPartition -> topicPartition.partition())
- .forEach(partition -> repository.resetStorageForPartition(partition));
-
- Map<Integer, Long> lastSeen =
- consumer
- .endOffsets(partitions)
- .entrySet()
- .stream()
- .collect(Collectors.toMap(
- entry -> entry.getKey().partition(),
- entry -> entry.getValue() - 1));
-
- Map<Integer, Long> positions =
- lastSeen
- .keySet()
- .stream()
- .collect(Collectors.toMap(
- partition -> partition,
- partition -> 0l));
-
- while (
- positions
- .entrySet()
- .stream()
- .map(entry -> entry.getValue() < lastSeen.get(entry.getKey()))
- .reduce(false, (a, b) -> a || b))
+ final long seen;
+
+
+ RestoreRecordHandler(Long endOffset)
{
- try
- {
- ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
- if (records.count() == 0)
- continue;
+ this.seen = endOffset - 1;
+ }
- log.debug("polled {} records", records.count());
- records.forEach(record ->
- {
- handleRecord(record, restoreUseCases);
- positions.put(record.partition(), record.offset());
- });
+
+ @Override
+ public void accept(ConsumerRecord<String, String> record)
+ {
+ if (seen < record.offset())
+ {
+ int partition = record.partition();
+ log.info(
+ "--> restore of partition {} completed: needed={}, seen={}!",
+ partition,
+ seen,
+ record.offset());
+ recordHandlers[partition] = productionRecordHandler;
+ productionRecordHandler.accept(record);
}
- catch(WakeupException e)
+ else
{
- log.info("--> cleanly interrupted while restoring");
+ handleRecord(record, restoreUseCases);
+ if (seen == record.offset())
+ {
+ int partition = record.partition();
+ log.info( "--> restore of partition {} completed!", partition);
+ recordHandlers[partition] = productionRecordHandler;
+ }
}
}
-
- log.info("--> restore completed!");
}
+
@PostMapping("start")
public synchronized String start()
{
if (running)
{
- log.info("already running!");
+ log.info("consumer already running!");
return "Already running!";
}
running = true;
future = CompletableFuture.runAsync(this);
- log.info("started");
+ log.info("consumer started");
return "Started";
}
{
if (!running)
{
- log.info("not running!");
+ log.info("consumer not running!");
return "Not running";
}
consumer.unsubscribe();
}
- log.info("stopped");
+ log.info("consumer stopped");
return "Stopped";
}