import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.payment.transfer.domain.Transfer;
import de.juplo.kafka.payment.transfer.ports.CreateTransferUseCase;
import de.juplo.kafka.payment.transfer.ports.GetTransferUseCase;
import de.juplo.kafka.payment.transfer.ports.HandleStateChangeUseCase;
-import lombok.RequiredArgsConstructor;
+import de.juplo.kafka.payment.transfer.ports.TransferRepository;
import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.MemberDescription;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
+import java.time.Clock;
import java.time.Duration;
-import java.util.List;
-import java.util.Map;
+import java.time.Instant;
+import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@RequestMapping("/consumer")
@ResponseBody
-@RequiredArgsConstructor
@Slf4j
-public class TransferConsumer implements Runnable
+public class TransferConsumer implements Runnable, ConsumerRebalanceListener
{
private final String topic;
+ private final int numPartitions;
private final KafkaConsumer<String, String> consumer;
+ private final AdminClient adminClient;
+ private final TransferRepository repository;
private final ObjectMapper mapper;
private final ConsumerUseCases productionUseCases, restoreUseCases;
- private boolean restoring = true;
private boolean running = false;
private boolean shutdown = false;
private Future<?> future = null;
+ private final String groupId;
+ private final String groupInstanceId;
+ private final Map<String, String> instanceIdUriMapping;
+ private final String[] instanceIdByPartition;
+
+ private Clock clock;
+ private int stateStoreInterval;
+
+ private volatile boolean partitionOwnershipUnknown = true;
+
+
+ public TransferConsumer(
+ String topic,
+ int numPartitions,
+ Map<String, String> instanceIdUriMapping,
+ KafkaConsumer<String, String> consumer,
+ AdminClient adminClient,
+ TransferRepository repository,
+ Clock clock,
+ int stateStoreInterval,
+ ObjectMapper mapper,
+ ConsumerUseCases productionUseCases,
+ ConsumerUseCases restoreUseCases)
+ {
+ this.topic = topic;
+ this.numPartitions = numPartitions;
+ this.groupId = consumer.groupMetadata().groupId();
+ this.groupInstanceId = consumer.groupMetadata().groupInstanceId().get();
+ this.instanceIdByPartition = new String[numPartitions];
+ this.instanceIdUriMapping = new HashMap<>(instanceIdUriMapping.size());
+ for (String instanceId : instanceIdUriMapping.keySet())
+ {
+ // Requests are not redirected for the instance itself
+ String uri = instanceId.equals(groupInstanceId)
+ ? null
+ : instanceIdUriMapping.get(instanceId);
+ this.instanceIdUriMapping.put(instanceId, uri);
+ }
+ this.consumer = consumer;
+ this.adminClient = adminClient;
+ this.repository = repository;
+ this.clock = clock;
+ this.stateStoreInterval = stateStoreInterval;
+ this.mapper = mapper;
+ this.productionUseCases = productionUseCases;
+ this.restoreUseCases = restoreUseCases;
+ }
+
@Override
public void run()
{
+ Instant stateStored = clock.instant();
+
while (running)
{
try
log.debug("polled {} records", records.count());
records.forEach(record -> handleRecord(record, productionUseCases));
+
+ Instant now = clock.instant();
+ if (
+ stateStoreInterval > 0 &&
+ Duration.between(stateStored, now).getSeconds() >= stateStoreInterval)
+ {
+ Map<Integer, Long> offsets = new HashMap<>();
+
+ for (TopicPartition topicPartition : consumer.assignment())
+ {
+ Integer partition = topicPartition.partition();
+ Long offset = consumer.position(topicPartition);
+ log.info("storing state locally for {}/{}: {}", topic, partition, offset);
+ offsets.put(partition, offset);
+ }
+
+ repository.storeState(offsets);
+ stateStored = now;
+ }
}
catch (WakeupException e)
{
}
}
+
+ /**
+ * Identifies the URI, at which the Group-Instance can be reached,
+ * that holds the state for a specific {@link Transfer}.
+ *
+ * The {@link Transfer#getId() ID} of the {@link Transfer} is named
+ * {@code key} here and of type {@code String}, because this example
+ * project stores the key as a String in Kafka to simplify the listing
+ * and manual manipulation of the according topic.
+ *
+ * @param key A {@code String}, that represents the {@link Transfer#getId() ID} of a {@link Transfer}.
+ * @return An {@link Optional}, that holds the URI at which the Group-Instance
+ * can be reached, that holds the state for the {@link Transfer}, that
+ * is identified by the key (if present), or is empty, if the {@link Transfer}
+ * would be handled by the local instance.
+ */
+ public Optional<String> uriForKey(String key)
+ {
+ synchronized (this)
+ {
+ while (partitionOwnershipUnknown)
+ {
+ try { wait(); } catch (InterruptedException e) {}
+ }
+
+ int partition = TransferPartitioner.computeHashForKey(key, numPartitions);
+ return
+ Optional
+ .ofNullable(instanceIdByPartition[partition])
+ .map(id -> instanceIdUriMapping.get(id));
+ }
+ }
+
@EventListener
public synchronized void onApplicationEvent(ContextRefreshedEvent event)
{
- // Needed, because this method is called synchronously during the
- // initialization pahse of Spring. If the restoring is processed
+ // "Needed", because this method is called synchronously during the
+ // initialization pahse of Spring. If the subscription happens
// in the same thread, it would block the completion of the initialization.
// Hence, the app would not react to any signal (CTRL-C, for example) except
// a KILL until the restoring is finished.
- future = CompletableFuture.runAsync(() -> restore());
+ future = CompletableFuture.runAsync(() -> start());
+ log.info("start of application completed");
}
- private void restore()
+
+ @Override
+ public void onPartitionsRevoked(Collection<TopicPartition> partitions)
{
- log.info("--> starting restore...");
+ partitionOwnershipUnknown = true;
+ log.info("partitions revoked: {}", partitions);
+ for (TopicPartition topicPartition : partitions)
+ {
+ int partition = topicPartition.partition();
+ long offset = consumer.position(topicPartition);
+ log.info("deactivating partition {}, offset: {}", partition, offset);
+ repository.deactivatePartition(partition, offset);
+ }
+ }
- List<TopicPartition> partitions =
- consumer
- .partitionsFor(topic)
- .stream()
- .map(info -> new TopicPartition(topic, info.partition()))
- .collect(Collectors.toList());
+ @Override
+ public void onPartitionsAssigned(Collection<TopicPartition> partitions)
+ {
+ log.info("partitions assigned: {}", partitions);
+ fetchAssignmentsAsync();
+ if (partitions.size() > 0)
+ {
+ for (TopicPartition topicPartition : partitions)
+ {
+ int partition = topicPartition.partition();
+ long offset = repository.activatePartition(partition);
+ log.info("activated partition {}, seeking to offset {}", partition, offset);
+ consumer.seek(topicPartition, offset);
+ }
+
+ restore(partitions);
+ }
+ }
+
+ private void fetchAssignmentsAsync()
+ {
+ adminClient
+ .describeConsumerGroups(List.of(groupId))
+ .describedGroups()
+ .get(groupId)
+ .whenComplete((descriptions, e) ->
+ {
+ if (e != null)
+ {
+ log.error("could not fetch group data: {}", e.getMessage());
+ }
+ else
+ {
+ synchronized (this)
+ {
+ for (MemberDescription description : descriptions.members())
+ {
+ description
+ .assignment()
+ .topicPartitions()
+ .forEach(tp -> instanceIdByPartition[tp.partition()] = description.groupInstanceId().get());
+ }
+ partitionOwnershipUnknown = false;
+ notifyAll();
+ }
+ }
+ });
+ }
+
+ @Override
+ public void onPartitionsLost(Collection<TopicPartition> partitions)
+ {
+ partitionOwnershipUnknown = true;
+ log.info("partiotions lost: {}", partitions);
+ }
+
+
+ private void restore(Collection<TopicPartition> partitions)
+ {
+ log.info("--> starting restore...");
Map<Integer, Long> lastSeen =
consumer
.stream()
.collect(Collectors.toMap(
partition -> partition,
- partition -> 0l));
-
- log.info("assigning {}}", partitions);
- consumer.assign(partitions);
+ partition -> repository.storedPosition(partition)));
while (
- restoring &&
positions
.entrySet()
.stream()
catch(WakeupException e)
{
log.info("--> cleanly interrupted while restoring");
- return;
}
}
log.info("--> restore completed!");
- restoring = false;
-
- // We are intentionally _not_ unsubscribing here, since that would
- // reset the offset to _earliest_, because we disabled offset-commits.
-
- start();
}
@PostMapping("start")
public synchronized String start()
{
- if (restoring)
+ if (running)
{
- log.error("cannot start while restoring");
- return "Denied: Restoring!";
+ log.info("consumer already running!");
+ return "Already running!";
}
- String result = "Started";
-
- if (running)
+ int foundNumPartitions = consumer.partitionsFor(topic).size();
+ if (foundNumPartitions != numPartitions)
{
- stop();
- result = "Restarted";
+ log.error(
+ "unexpected number of partitions for topic {}: expected={}, found={}",
+ topic,
+ numPartitions,
+ foundNumPartitions
+ );
+ return "Wrong number of partitions for topic " + topic + ": " + foundNumPartitions;
}
+ consumer.subscribe(List.of(topic), this);
+
running = true;
future = CompletableFuture.runAsync(this);
- log.info("started");
- return result;
+ log.info("consumer started");
+ return "Started";
}
@PostMapping("stop")
public synchronized String stop()
{
- if (!(running || restoring))
+ if (!running)
{
- log.info("not running!");
+ log.info("consumer not running!");
return "Not running";
}
finally
{
future = null;
+ consumer.unsubscribe();
}
- log.info("stopped");
+ log.info("consumer stopped");
return "Stopped";
}
}
+
public interface ConsumerUseCases
extends
GetTransferUseCase,