From: Kai Moritz Date: Tue, 29 Jun 2021 18:56:39 +0000 (+0200) Subject: State must only be stored locally, if the corresponding offset is known X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=a9a2158de30cb47c96cfabf06c797d81a352a1e1;p=demos%2Fkafka%2Fdemos-kafka-payment-system-transfer State must only be stored locally, if the corresponding offset is known --- diff --git a/src/main/java/de/juplo/kafka/payment/transfer/persistence/InMemoryTransferRepository.java b/src/main/java/de/juplo/kafka/payment/transfer/persistence/InMemoryTransferRepository.java index c29adf7..dd1d7ad 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/persistence/InMemoryTransferRepository.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/persistence/InMemoryTransferRepository.java @@ -74,6 +74,9 @@ public class InMemoryTransferRepository implements TransferRepository { int partition = partitionForId(transfer.getId()); data.mappings[partition].put(transfer.getId(), mapper.writeValueAsString(transfer)); + // We reset the offset for the state of the modified partition, + // because the corresponding offset is not known (yet). + data.offsets[partition] = 0; } catch (JsonProcessingException e) { @@ -109,6 +112,19 @@ public class InMemoryTransferRepository implements TransferRepository @Override public long activatePartition(int partition) { + if (data.offsets[partition] == 0) + { + // Initialize the state of the partition, if + // no corresponding offset is known. + if (data.mappings[partition] != null) + log.warn( + "dropping state for partition {} ({} entries), because the corresponding offset is unknown!", + partition, + data.mappings[partition].size()); + + data.mappings[partition] = new HashMap<>(); + } + return data.offsets[partition]; } @@ -128,6 +144,18 @@ public class InMemoryTransferRepository implements TransferRepository public void storeState(Map offsets) { offsets.forEach((partition, offset) -> data.offsets[partition] = offset); + for (int i = 0; i < numPartitions; i++) + { + if (data.offsets[i] == 0 && data.mappings[i] != null) + { + log.warn( + "dropping state for partition {} ({} entries), because the corresponding offset is unknown!", + i, + data.mappings[i].size()); + + data.mappings[i] = null; + } + } stateStore.ifPresent(file -> { try ( @@ -168,10 +196,7 @@ public class InMemoryTransferRepository implements TransferRepository offsets = new long[numPartitions]; mappings = new Map[numPartitions]; for (int i = 0; i < numPartitions; i++) - { offsets[i] = 0; - mappings[i] = new HashMap<>(); - } } } }