import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.payment.transfer.domain.Transfer;
import de.juplo.kafka.payment.transfer.ports.CreateTransferUseCase;
import de.juplo.kafka.payment.transfer.ports.GetTransferUseCase;
import de.juplo.kafka.payment.transfer.ports.HandleStateChangeUseCase;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.MemberDescription;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.*;
+import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.context.event.EventListener;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
+import java.nio.ByteBuffer;
+import java.time.Clock;
import java.time.Duration;
+import java.time.Instant;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
private final Map<String, String> instanceIdUriMapping;
private final String[] instanceIdByPartition;
+ private Clock clock;
+ private int stateStoreInterval;
+
private volatile boolean partitionOwnershipUnknown = true;
public TransferConsumer(
+ String bootstrapServers,
+ String groupId,
+ String groupInstanceId,
String topic,
int numPartitions,
Map<String, String> instanceIdUriMapping,
- KafkaConsumer<String, String> consumer,
AdminClient adminClient,
TransferRepository repository,
+ Clock clock,
+ int stateStoreInterval,
ObjectMapper mapper,
ConsumerUseCases productionUseCases,
ConsumerUseCases restoreUseCases)
{
+ Properties props = new Properties();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+ props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId);
+ props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, new Assignor());
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+
+ this.consumer = new KafkaConsumer<>(props).;
+
+ this.groupId = groupId;
+ this.groupInstanceId = groupInstanceId;
this.topic = topic;
this.numPartitions = numPartitions;
- this.groupId = consumer.groupMetadata().groupId();
- this.groupInstanceId = consumer.groupMetadata().groupInstanceId().get();
this.instanceIdByPartition = new String[numPartitions];
this.instanceIdUriMapping = new HashMap<>(instanceIdUriMapping.size());
for (String instanceId : instanceIdUriMapping.keySet())
: instanceIdUriMapping.get(instanceId);
this.instanceIdUriMapping.put(instanceId, uri);
}
- this.consumer = consumer;
this.adminClient = adminClient;
this.repository = repository;
+ this.clock = clock;
+ this.stateStoreInterval = stateStoreInterval;
this.mapper = mapper;
this.productionUseCases = productionUseCases;
this.restoreUseCases = restoreUseCases;
@Override
public void run()
{
+ Instant stateStored = clock.instant();
+
while (running)
{
try
log.debug("polled {} records", records.count());
records.forEach(record -> handleRecord(record, productionUseCases));
+
+ Instant now = clock.instant();
+ if (
+ stateStoreInterval > 0 &&
+ Duration.between(stateStored, now).getSeconds() >= stateStoreInterval)
+ {
+ Map<Integer, Long> offsets = new HashMap<>();
+
+ for (TopicPartition topicPartition : consumer.assignment())
+ {
+ Integer partition = topicPartition.partition();
+ Long offset = consumer.position(topicPartition);
+ log.info("storing state locally for {}/{}: {}", topic, partition, offset);
+ offsets.put(partition, offset);
+ }
+
+ repository.storeState(offsets);
+ stateStored = now;
+ }
}
catch (WakeupException e)
{
}
+ /**
+ * Identifies the URI, at which the Group-Instance can be reached,
+ * that holds the state for a specific {@link Transfer}.
+ *
+ * The {@link Transfer#getId() ID} of the {@link Transfer} is named
+ * {@code key} here and of type {@code String}, because this example
+ * project stores the key as a String in Kafka to simplify the listing
+ * and manual manipulation of the according topic.
+ *
+ * @param key A {@code String}, that represents the {@link Transfer#getId() ID} of a {@link Transfer}.
+ * @return An {@link Optional}, that holds the URI at which the Group-Instance
+ * can be reached, that holds the state for the {@link Transfer}, that
+ * is identified by the key (if present), or is empty, if the {@link Transfer}
+ * would be handled by the local instance.
+ */
public Optional<String> uriForKey(String key)
{
synchronized (this)
// Hence, the app would not react to any signal (CTRL-C, for example) except
// a KILL until the restoring is finished.
future = CompletableFuture.runAsync(() -> start());
+ log.info("start of application completed");
}
{
partitionOwnershipUnknown = true;
log.info("partitions revoked: {}", partitions);
+ for (TopicPartition topicPartition : partitions)
+ {
+ int partition = topicPartition.partition();
+ long offset = consumer.position(topicPartition);
+ log.info("deactivating partition {}, offset: {}", partition, offset);
+ repository.deactivatePartition(partition, offset);
+ }
}
@Override
log.info("partitions assigned: {}", partitions);
fetchAssignmentsAsync();
if (partitions.size() > 0)
+ {
+ for (TopicPartition topicPartition : partitions)
+ {
+ int partition = topicPartition.partition();
+ long offset = repository.activatePartition(partition);
+ log.info("activated partition {}, seeking to offset {}", partition, offset);
+ consumer.seek(topicPartition, offset);
+ }
+
restore(partitions);
+ }
}
private void fetchAssignmentsAsync()
{
log.info("--> starting restore...");
- partitions
- .stream()
- .map(topicPartition -> topicPartition.partition())
- .forEach(partition -> repository.resetStorageForPartition(partition));
-
Map<Integer, Long> lastSeen =
consumer
.endOffsets(partitions)
.stream()
.collect(Collectors.toMap(
partition -> partition,
- partition -> 0l));
+ partition -> repository.storedPosition(partition)));
while (
positions
{
if (running)
{
- log.info("already running!");
+ log.info("consumer already running!");
return "Already running!";
}
running = true;
future = CompletableFuture.runAsync(this);
- log.info("started");
+ log.info("consumer started");
return "Started";
}
{
if (!running)
{
- log.info("not running!");
+ log.info("consumer not running!");
return "Not running";
}
consumer.unsubscribe();
}
- log.info("stopped");
+ log.info("consumer stopped");
return "Stopped";
}
GetTransferUseCase,
CreateTransferUseCase,
HandleStateChangeUseCase {};
+
+ public class Assignor extends CooperativeStickyAssignor
+ {
+ @Override
+ public GroupAssignment assign(
+ Cluster metadata,
+ GroupSubscription groupSubscription)
+ {
+ return super.assign(metadata, groupSubscription);
+ }
+
+ @Override
+ public ByteBuffer subscriptionUserData(Set<String> topics)
+ {
+ return null;
+ }
+
+ @Override
+ public void onAssignment(
+ Assignment assignment,
+ ConsumerGroupMetadata metadata)
+ {
+ log.info("New assignment: {}, {}", assignment, metadata);
+ }
+ }
}