juplo:
transfer:
group-instance-id: peter
+ state-store-interval: 15
+ local-state-store-path: state.bin
import de.juplo.kafka.payment.transfer.persistence.InMemoryTransferRepository;
import de.juplo.kafka.payment.transfer.ports.TransferRepository;
import de.juplo.kafka.payment.transfer.ports.TransferService;
+import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.util.Assert;
+import org.springframework.util.StringUtils;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Clock;
import java.util.Optional;
import java.util.Properties;
KafkaConsumer<String, String> consumer,
AdminClient adminClient,
TransferRepository repository,
+ LocalStateStoreSettings localStateStoreSettings,
ObjectMapper mapper,
TransferService productionTransferService,
TransferService restoreTransferService)
consumer,
adminClient,
repository,
+ Clock.systemDefaultZone(),
+ localStateStoreSettings.interval,
mapper,
new TransferConsumer.ConsumerUseCases() {
@Override
return new KafkaMessagingService(producer, mapper, properties.getTopic());
}
+ @RequiredArgsConstructor
+ static class LocalStateStoreSettings
+ {
+ final Optional<File> file;
+ final int interval;
+ }
+
+ @Bean
+ LocalStateStoreSettings localStateStoreSettings(TransferServiceProperties properties)
+ {
+ if (properties.getStateStoreInterval() < 1)
+ {
+ log.info("juplo.transfer.state-store-interval is < 1: local storage of state is deactivated");
+ return new LocalStateStoreSettings(Optional.empty(), 0);
+ }
+
+ if (!StringUtils.hasText(properties.getLocalStateStorePath()))
+ {
+ log.info("juplo.transfer.local-state-store-path is not set: local storage of state is deactivated!");
+ return new LocalStateStoreSettings(Optional.empty(), 0);
+ }
+
+ Path path = Path.of(properties.getLocalStateStorePath());
+ log.info("using {} as local state store", path.toAbsolutePath());
+
+ if (Files.notExists(path))
+ {
+ try
+ {
+ Files.createFile(path);
+ }
+ catch (IOException e)
+ {
+ throw new IllegalArgumentException("Could not create local state store: " + path.toAbsolutePath());
+ }
+ }
+
+ if (!(Files.isReadable(path) && Files.isWritable(path)))
+ {
+ throw new IllegalArgumentException("No R/W-access on local state store: " + path.toAbsolutePath());
+ }
+
+ return new LocalStateStoreSettings(Optional.of(path.toFile()), properties.getStateStoreInterval());
+ }
+
@Bean
InMemoryTransferRepository inMemoryTransferRepository(
+ LocalStateStoreSettings localStateStoreSettings,
TransferServiceProperties properties,
ObjectMapper mapper)
{
- return new InMemoryTransferRepository(properties.getNumPartitions(), mapper);
+ return new InMemoryTransferRepository(localStateStoreSettings.file, properties.getNumPartitions(), mapper);
}
@Bean
private String groupId = "transfers";
private String groupInstanceId;
private Map<String, String> instanceIdUriMapping;
+ private String localStateStorePath;
+ private int stateStoreInterval = 60;
public Map<String, String> getInstanceIdUriMapping()
{
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
+import java.time.Clock;
import java.time.Duration;
+import java.time.Instant;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
private final Map<String, String> instanceIdUriMapping;
private final String[] instanceIdByPartition;
+ private Clock clock;
+ private int stateStoreInterval;
+
private volatile boolean partitionOwnershipUnknown = true;
KafkaConsumer<String, String> consumer,
AdminClient adminClient,
TransferRepository repository,
+ Clock clock,
+ int stateStoreInterval,
ObjectMapper mapper,
ConsumerUseCases productionUseCases,
ConsumerUseCases restoreUseCases)
this.consumer = consumer;
this.adminClient = adminClient;
this.repository = repository;
+ this.clock = clock;
+ this.stateStoreInterval = stateStoreInterval;
this.mapper = mapper;
this.productionUseCases = productionUseCases;
this.restoreUseCases = restoreUseCases;
@Override
public void run()
{
+ Instant stateStored = clock.instant();
+
while (running)
{
try
log.debug("polled {} records", records.count());
records.forEach(record -> handleRecord(record, productionUseCases));
+
+ Instant now = clock.instant();
+ if (
+ stateStoreInterval > 0 &&
+ Duration.between(stateStored, now).getSeconds() >= stateStoreInterval)
+ {
+ Map<Integer, Long> offsets = new HashMap<>();
+
+ for (TopicPartition topicPartition : consumer.assignment())
+ {
+ Integer partition = topicPartition.partition();
+ Long offset = consumer.position(topicPartition);
+ log.info("storing state locally for {}/{}: {}", topic, partition, offset);
+ offsets.put(partition, offset);
+ }
+
+ repository.storeState(offsets);
+ stateStored = now;
+ }
}
catch (WakeupException e)
{
{
partitionOwnershipUnknown = true;
log.info("partitions revoked: {}", partitions);
+ for (TopicPartition topicPartition : partitions)
+ {
+ int partition = topicPartition.partition();
+ long offset = consumer.position(topicPartition);
+ log.info("deactivating partition {}, offset: {}", partition, offset);
+ repository.deactivatePartition(partition, offset);
+ }
}
@Override
log.info("partitions assigned: {}", partitions);
fetchAssignmentsAsync();
if (partitions.size() > 0)
+ {
+ for (TopicPartition topicPartition : partitions)
+ {
+ int partition = topicPartition.partition();
+ long offset = repository.activatePartition(partition);
+ log.info("activated partition {}, seeking to offset {}", partition, offset);
+ consumer.seek(topicPartition, offset);
+ }
+
restore(partitions);
+ }
}
private void fetchAssignmentsAsync()
{
log.info("--> starting restore...");
- partitions
- .stream()
- .map(topicPartition -> topicPartition.partition())
- .forEach(partition -> repository.resetStorageForPartition(partition));
-
Map<Integer, Long> lastSeen =
consumer
.endOffsets(partitions)
.stream()
.collect(Collectors.toMap(
partition -> partition,
- partition -> 0l));
+ partition -> repository.storedPosition(partition)));
while (
positions
import de.juplo.kafka.payment.transfer.ports.TransferRepository;
import lombok.extern.slf4j.Slf4j;
+import java.io.*;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
+import java.util.stream.IntStream;
@Slf4j
public class InMemoryTransferRepository implements TransferRepository
{
private final int numPartitions;
- private final Map<Long, String> mappings[];
private final ObjectMapper mapper;
+ private final Data data;
+ private final Optional<File> stateStore;
- public InMemoryTransferRepository(int numPartitions, ObjectMapper mapper)
+
+ public InMemoryTransferRepository(Optional<File> stateStore, int numPartitions, ObjectMapper mapper)
{
+ this.stateStore = stateStore;
this.numPartitions = numPartitions;
- this.mappings = new HashMap[numPartitions];
- for (int i = 0; i < numPartitions; i++)
- this.mappings[i] = new HashMap<>();
this.mapper = mapper;
+
+ Data data = null;
+ try
+ {
+ if (stateStore.isPresent())
+ {
+ try (
+ FileInputStream fis = new FileInputStream(stateStore.get());
+ ObjectInputStream ois = new ObjectInputStream(fis))
+ {
+ data = (Data) ois.readObject();
+ final long offsets[] = data.offsets;
+ final Map<Long, String> map[] = data.mappings;
+ IntStream
+ .range(0, numPartitions)
+ .forEach(i -> log.info(
+ "restored locally stored state for partition {}: position={}, entries={}",
+ i,
+ offsets[i],
+ offsets[i] == 0 ? 0 : map[i].size()));
+ return;
+ }
+ catch (IOException | ClassNotFoundException e)
+ {
+ log.error("could not read state from local store {}: {}", stateStore.get(), e.getMessage());
+ }
+ }
+
+ log.info("will restore state from Kafka");
+ data = new Data(numPartitions);
+ }
+ finally
+ {
+ this.data = data;
+ }
}
try
{
int partition = partitionForId(transfer.getId());
- mappings[partition].put(transfer.getId(), mapper.writeValueAsString(transfer));
+ data.mappings[partition].put(transfer.getId(), mapper.writeValueAsString(transfer));
}
catch (JsonProcessingException e)
{
{
return
Optional
- .ofNullable(this.mappings[partitionForId(id)])
+ .ofNullable(data.mappings[partitionForId(id)])
.map(mapping -> mapping.get(id))
.map(json -> {
try
@Override
public void remove(Long id)
{
- mappings[partitionForId(id)].remove(id);
+ data.mappings[partitionForId(id)].remove(id);
}
@Override
- public void resetStorageForPartition(int partition)
+ public long activatePartition(int partition)
{
- log.info(
- "resetting storage for partition {}: dropping {} entries",
- partition,
- mappings[partition].size());
- mappings[partition].clear();
+ return data.offsets[partition];
}
+ @Override
+ public void deactivatePartition(int partition, long offset)
+ {
+ data.offsets[partition] = offset;
+ }
+
+ @Override
+ public long storedPosition(int partition)
+ {
+ return data.offsets[partition];
+ }
+
+ @Override
+ public void storeState(Map<Integer, Long> offsets)
+ {
+ offsets.forEach((partition, offset) -> data.offsets[partition] = offset);
+ stateStore.ifPresent(file ->
+ {
+ try (
+ FileOutputStream fos = new FileOutputStream(file);
+ ObjectOutputStream oos = new ObjectOutputStream(fos))
+ {
+ oos.writeObject(data);
+ IntStream
+ .range(0, numPartitions)
+ .forEach(i -> log.info(
+ "locally stored state for partition {}: position={}, entries={}",
+ i,
+ data.offsets[i],
+ data.offsets[i] == 0 ? 0 : data.mappings[i].size()));
+ }
+ catch (IOException e)
+ {
+ log.error("could not write state to store {}: {}", file, e.getMessage());
+ }
+ });
+ }
+
+
private int partitionForId(long id)
{
String key = Long.toString(id);
return TransferPartitioner.computeHashForKey(key, numPartitions);
}
+
+
+ static class Data implements Serializable
+ {
+ final long offsets[];
+ final Map<Long, String> mappings[];
+
+ Data(int numPartitions)
+ {
+ offsets = new long[numPartitions];
+ mappings = new Map[numPartitions];
+ for (int i = 0; i < numPartitions; i++)
+ {
+ offsets[i] = 0;
+ mappings[i] = new HashMap<>();
+ }
+ }
+ }
}
import de.juplo.kafka.payment.transfer.domain.Transfer;
+import java.util.Map;
import java.util.Optional;
void remove(Long id);
- void resetStorageForPartition(int partition);
+ long activatePartition(int partition);
+
+ void deactivatePartition(int partition, long offset);
+
+ long storedPosition(int partition);
+
+ void storeState(Map<Integer, Long> offsets);
}