X-Git-Url: https://juplo.de/gitweb/?p=demos%2Fkafka%2Fdemos-kafka-payment-system-transfer;a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fpayment%2Ftransfer%2Fpersistence%2FInMemoryTransferRepository.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fpayment%2Ftransfer%2Fpersistence%2FInMemoryTransferRepository.java;h=dd1d7add5e13f9b8cc4968df8e869ebfb7ab6adc;hp=c29adf7657a6015f315b64f0281e6df235517657;hb=a9a2158de30cb47c96cfabf06c797d81a352a1e1;hpb=2432aeedb30ac4c1405045514d8eacb791a4d352 diff --git a/src/main/java/de/juplo/kafka/payment/transfer/persistence/InMemoryTransferRepository.java b/src/main/java/de/juplo/kafka/payment/transfer/persistence/InMemoryTransferRepository.java index c29adf7..dd1d7ad 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/persistence/InMemoryTransferRepository.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/persistence/InMemoryTransferRepository.java @@ -74,6 +74,9 @@ public class InMemoryTransferRepository implements TransferRepository { int partition = partitionForId(transfer.getId()); data.mappings[partition].put(transfer.getId(), mapper.writeValueAsString(transfer)); + // We reset the offset for the state of the modified partition, + // because the corresponding offset is not known (yet). + data.offsets[partition] = 0; } catch (JsonProcessingException e) { @@ -109,6 +112,19 @@ public class InMemoryTransferRepository implements TransferRepository @Override public long activatePartition(int partition) { + if (data.offsets[partition] == 0) + { + // Initialize the state of the partition, if + // no corresponding offset is known. + if (data.mappings[partition] != null) + log.warn( + "dropping state for partition {} ({} entries), because the corresponding offset is unknown!", + partition, + data.mappings[partition].size()); + + data.mappings[partition] = new HashMap<>(); + } + return data.offsets[partition]; } @@ -128,6 +144,18 @@ public class InMemoryTransferRepository implements TransferRepository public void storeState(Map offsets) { offsets.forEach((partition, offset) -> data.offsets[partition] = offset); + for (int i = 0; i < numPartitions; i++) + { + if (data.offsets[i] == 0 && data.mappings[i] != null) + { + log.warn( + "dropping state for partition {} ({} entries), because the corresponding offset is unknown!", + i, + data.mappings[i].size()); + + data.mappings[i] = null; + } + } stateStore.ifPresent(file -> { try ( @@ -168,10 +196,7 @@ public class InMemoryTransferRepository implements TransferRepository offsets = new long[numPartitions]; mappings = new Map[numPartitions]; for (int i = 0; i < numPartitions; i++) - { offsets[i] = 0; - mappings[i] = new HashMap<>(); - } } } }