From 2432aeedb30ac4c1405045514d8eacb791a4d352 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Tue, 29 Jun 2021 20:51:54 +0200 Subject: [PATCH] The state is periodically stored in a local file, that is read on startup --- application.yml | 2 + .../transfer/TransferServiceApplication.java | 58 ++++++++- .../transfer/TransferServiceProperties.java | 2 + .../transfer/adapter/TransferConsumer.java | 54 +++++++- .../InMemoryTransferRepository.java | 118 +++++++++++++++--- .../transfer/ports/TransferRepository.java | 9 +- 6 files changed, 221 insertions(+), 22 deletions(-) diff --git a/application.yml b/application.yml index 818a596..daac570 100644 --- a/application.yml +++ b/application.yml @@ -1,3 +1,5 @@ juplo: transfer: group-instance-id: peter + state-store-interval: 15 + local-state-store-path: state.bin diff --git a/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java b/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java index a82f8b1..5114a1c 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java @@ -7,6 +7,7 @@ import de.juplo.kafka.payment.transfer.domain.Transfer; import de.juplo.kafka.payment.transfer.persistence.InMemoryTransferRepository; import de.juplo.kafka.payment.transfer.ports.TransferRepository; import de.juplo.kafka.payment.transfer.ports.TransferService; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; @@ -22,7 +23,13 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.util.Assert; +import org.springframework.util.StringUtils; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Clock; import java.util.Optional; import java.util.Properties; @@ -87,6 +94,7 @@ public class TransferServiceApplication KafkaConsumer consumer, AdminClient adminClient, TransferRepository repository, + LocalStateStoreSettings localStateStoreSettings, ObjectMapper mapper, TransferService productionTransferService, TransferService restoreTransferService) @@ -99,6 +107,8 @@ public class TransferServiceApplication consumer, adminClient, repository, + Clock.systemDefaultZone(), + localStateStoreSettings.interval, mapper, new TransferConsumer.ConsumerUseCases() { @Override @@ -149,12 +159,58 @@ public class TransferServiceApplication return new KafkaMessagingService(producer, mapper, properties.getTopic()); } + @RequiredArgsConstructor + static class LocalStateStoreSettings + { + final Optional file; + final int interval; + } + + @Bean + LocalStateStoreSettings localStateStoreSettings(TransferServiceProperties properties) + { + if (properties.getStateStoreInterval() < 1) + { + log.info("juplo.transfer.state-store-interval is < 1: local storage of state is deactivated"); + return new LocalStateStoreSettings(Optional.empty(), 0); + } + + if (!StringUtils.hasText(properties.getLocalStateStorePath())) + { + log.info("juplo.transfer.local-state-store-path is not set: local storage of state is deactivated!"); + return new LocalStateStoreSettings(Optional.empty(), 0); + } + + Path path = Path.of(properties.getLocalStateStorePath()); + log.info("using {} as local state store", path.toAbsolutePath()); + + if (Files.notExists(path)) + { + try + { + Files.createFile(path); + } + catch (IOException e) + { + throw new IllegalArgumentException("Could not create local state store: " + path.toAbsolutePath()); + } + } + + if (!(Files.isReadable(path) && Files.isWritable(path))) + { + throw new IllegalArgumentException("No R/W-access on local state store: " + path.toAbsolutePath()); + } + + return new LocalStateStoreSettings(Optional.of(path.toFile()), properties.getStateStoreInterval()); + } + @Bean InMemoryTransferRepository inMemoryTransferRepository( + LocalStateStoreSettings localStateStoreSettings, TransferServiceProperties properties, ObjectMapper mapper) { - return new InMemoryTransferRepository(properties.getNumPartitions(), mapper); + return new InMemoryTransferRepository(localStateStoreSettings.file, properties.getNumPartitions(), mapper); } @Bean diff --git a/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceProperties.java b/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceProperties.java index e001748..907a8b9 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceProperties.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceProperties.java @@ -20,6 +20,8 @@ public class TransferServiceProperties private String groupId = "transfers"; private String groupInstanceId; private Map instanceIdUriMapping; + private String localStateStorePath; + private int stateStoreInterval = 60; public Map getInstanceIdUriMapping() { diff --git a/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java b/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java index 2ef7ee3..aa00737 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java @@ -21,7 +21,9 @@ import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; +import java.time.Clock; import java.time.Duration; +import java.time.Instant; import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -51,6 +53,9 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener private final Map instanceIdUriMapping; private final String[] instanceIdByPartition; + private Clock clock; + private int stateStoreInterval; + private volatile boolean partitionOwnershipUnknown = true; @@ -61,6 +66,8 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener KafkaConsumer consumer, AdminClient adminClient, TransferRepository repository, + Clock clock, + int stateStoreInterval, ObjectMapper mapper, ConsumerUseCases productionUseCases, ConsumerUseCases restoreUseCases) @@ -82,6 +89,8 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener this.consumer = consumer; this.adminClient = adminClient; this.repository = repository; + this.clock = clock; + this.stateStoreInterval = stateStoreInterval; this.mapper = mapper; this.productionUseCases = productionUseCases; this.restoreUseCases = restoreUseCases; @@ -91,6 +100,8 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener @Override public void run() { + Instant stateStored = clock.instant(); + while (running) { try @@ -101,6 +112,25 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener log.debug("polled {} records", records.count()); records.forEach(record -> handleRecord(record, productionUseCases)); + + Instant now = clock.instant(); + if ( + stateStoreInterval > 0 && + Duration.between(stateStored, now).getSeconds() >= stateStoreInterval) + { + Map offsets = new HashMap<>(); + + for (TopicPartition topicPartition : consumer.assignment()) + { + Integer partition = topicPartition.partition(); + Long offset = consumer.position(topicPartition); + log.info("storing state locally for {}/{}: {}", topic, partition, offset); + offsets.put(partition, offset); + } + + repository.storeState(offsets); + stateStored = now; + } } catch (WakeupException e) { @@ -196,6 +226,13 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener { partitionOwnershipUnknown = true; log.info("partitions revoked: {}", partitions); + for (TopicPartition topicPartition : partitions) + { + int partition = topicPartition.partition(); + long offset = consumer.position(topicPartition); + log.info("deactivating partition {}, offset: {}", partition, offset); + repository.deactivatePartition(partition, offset); + } } @Override @@ -204,7 +241,17 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener log.info("partitions assigned: {}", partitions); fetchAssignmentsAsync(); if (partitions.size() > 0) + { + for (TopicPartition topicPartition : partitions) + { + int partition = topicPartition.partition(); + long offset = repository.activatePartition(partition); + log.info("activated partition {}, seeking to offset {}", partition, offset); + consumer.seek(topicPartition, offset); + } + restore(partitions); + } } private void fetchAssignmentsAsync() @@ -249,11 +296,6 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener { log.info("--> starting restore..."); - partitions - .stream() - .map(topicPartition -> topicPartition.partition()) - .forEach(partition -> repository.resetStorageForPartition(partition)); - Map lastSeen = consumer .endOffsets(partitions) @@ -269,7 +311,7 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener .stream() .collect(Collectors.toMap( partition -> partition, - partition -> 0l)); + partition -> repository.storedPosition(partition))); while ( positions diff --git a/src/main/java/de/juplo/kafka/payment/transfer/persistence/InMemoryTransferRepository.java b/src/main/java/de/juplo/kafka/payment/transfer/persistence/InMemoryTransferRepository.java index 2a4d734..c29adf7 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/persistence/InMemoryTransferRepository.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/persistence/InMemoryTransferRepository.java @@ -7,26 +7,63 @@ import de.juplo.kafka.payment.transfer.domain.Transfer; import de.juplo.kafka.payment.transfer.ports.TransferRepository; import lombok.extern.slf4j.Slf4j; +import java.io.*; import java.util.HashMap; import java.util.Map; import java.util.Optional; +import java.util.stream.IntStream; @Slf4j public class InMemoryTransferRepository implements TransferRepository { private final int numPartitions; - private final Map mappings[]; private final ObjectMapper mapper; + private final Data data; + private final Optional stateStore; - public InMemoryTransferRepository(int numPartitions, ObjectMapper mapper) + + public InMemoryTransferRepository(Optional stateStore, int numPartitions, ObjectMapper mapper) { + this.stateStore = stateStore; this.numPartitions = numPartitions; - this.mappings = new HashMap[numPartitions]; - for (int i = 0; i < numPartitions; i++) - this.mappings[i] = new HashMap<>(); this.mapper = mapper; + + Data data = null; + try + { + if (stateStore.isPresent()) + { + try ( + FileInputStream fis = new FileInputStream(stateStore.get()); + ObjectInputStream ois = new ObjectInputStream(fis)) + { + data = (Data) ois.readObject(); + final long offsets[] = data.offsets; + final Map map[] = data.mappings; + IntStream + .range(0, numPartitions) + .forEach(i -> log.info( + "restored locally stored state for partition {}: position={}, entries={}", + i, + offsets[i], + offsets[i] == 0 ? 0 : map[i].size())); + return; + } + catch (IOException | ClassNotFoundException e) + { + log.error("could not read state from local store {}: {}", stateStore.get(), e.getMessage()); + } + } + + log.info("will restore state from Kafka"); + data = new Data(numPartitions); + } + finally + { + this.data = data; + } } @@ -36,7 +73,7 @@ public class InMemoryTransferRepository implements TransferRepository try { int partition = partitionForId(transfer.getId()); - mappings[partition].put(transfer.getId(), mapper.writeValueAsString(transfer)); + data.mappings[partition].put(transfer.getId(), mapper.writeValueAsString(transfer)); } catch (JsonProcessingException e) { @@ -49,7 +86,7 @@ public class InMemoryTransferRepository implements TransferRepository { return Optional - .ofNullable(this.mappings[partitionForId(id)]) + .ofNullable(data.mappings[partitionForId(id)]) .map(mapping -> mapping.get(id)) .map(json -> { try @@ -66,22 +103,75 @@ public class InMemoryTransferRepository implements TransferRepository @Override public void remove(Long id) { - mappings[partitionForId(id)].remove(id); + data.mappings[partitionForId(id)].remove(id); } @Override - public void resetStorageForPartition(int partition) + public long activatePartition(int partition) { - log.info( - "resetting storage for partition {}: dropping {} entries", - partition, - mappings[partition].size()); - mappings[partition].clear(); + return data.offsets[partition]; } + @Override + public void deactivatePartition(int partition, long offset) + { + data.offsets[partition] = offset; + } + + @Override + public long storedPosition(int partition) + { + return data.offsets[partition]; + } + + @Override + public void storeState(Map offsets) + { + offsets.forEach((partition, offset) -> data.offsets[partition] = offset); + stateStore.ifPresent(file -> + { + try ( + FileOutputStream fos = new FileOutputStream(file); + ObjectOutputStream oos = new ObjectOutputStream(fos)) + { + oos.writeObject(data); + IntStream + .range(0, numPartitions) + .forEach(i -> log.info( + "locally stored state for partition {}: position={}, entries={}", + i, + data.offsets[i], + data.offsets[i] == 0 ? 0 : data.mappings[i].size())); + } + catch (IOException e) + { + log.error("could not write state to store {}: {}", file, e.getMessage()); + } + }); + } + + private int partitionForId(long id) { String key = Long.toString(id); return TransferPartitioner.computeHashForKey(key, numPartitions); } + + + static class Data implements Serializable + { + final long offsets[]; + final Map mappings[]; + + Data(int numPartitions) + { + offsets = new long[numPartitions]; + mappings = new Map[numPartitions]; + for (int i = 0; i < numPartitions; i++) + { + offsets[i] = 0; + mappings[i] = new HashMap<>(); + } + } + } } diff --git a/src/main/java/de/juplo/kafka/payment/transfer/ports/TransferRepository.java b/src/main/java/de/juplo/kafka/payment/transfer/ports/TransferRepository.java index 0629eab..f21604d 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/ports/TransferRepository.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/ports/TransferRepository.java @@ -2,6 +2,7 @@ package de.juplo.kafka.payment.transfer.ports; import de.juplo.kafka.payment.transfer.domain.Transfer; +import java.util.Map; import java.util.Optional; @@ -13,5 +14,11 @@ public interface TransferRepository void remove(Long id); - void resetStorageForPartition(int partition); + long activatePartition(int partition); + + void deactivatePartition(int partition, long offset); + + long storedPosition(int partition); + + void storeState(Map offsets); } -- 2.20.1