From: Kai Moritz Date: Sat, 2 Nov 2024 19:38:04 +0000 (+0100) Subject: TX-kompatibler Weg zur Prüfung auf eine abgeschlossene Wiederherstellung X-Git-Tag: consumer/spring-consumer--log-compaction--2024-11-13--si~1 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=bf40c8cf3bdf45f9969747c7a6285b2cbe034299;p=demos%2Fkafka%2Ftraining TX-kompatibler Weg zur Prüfung auf eine abgeschlossene Wiederherstellung * Der bisher verwendete Vergleich der Offset-Positionen schlägt fehl, wenn die Implementierung um Transaktionen erweitert wird * _Grund:_ Dann stimmt die Offset-Position nicht mehr überein, weil nach der letzten Zustands-Nachricht noch eine, von der Transaktion erzeugte, versteckte Nachricht folgt, die die Anwendung nie zu sehen bekommt! --- diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index a0a91b5..fa2ff81 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -115,6 +115,8 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener record.value()); } + checkRestoreProgress(partition); + done[partition.partition()] = true; }); @@ -167,32 +169,44 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener } else { - handleState(partition, offset, key, value); + handleState(partition, key, value); + } + } + + private void checkRestoreProgress(TopicPartition topicPartition) + { + int partition = topicPartition.partition(); + + if (partitionStates[partition] == State.RESTORING) + { + long consumerPosition = consumer.position(topicPartition); + + if (consumerPosition + 1 >= stateEndOffsets[partition]) + { + log.info( + "{} - Position of consumer is {}. Restoring of state for partition {} done!", + id, + consumerPosition, + topicPartition); + stateAssigned(partition); + } + else + { + log.debug( + "{} - Restored state up to offset {}, end-offset: {}", + id, + consumerPosition, + stateEndOffsets[partition]); + } } } private synchronized void handleState( int partition, - long offset, String key, String value) { restoredState[partition].put(key, Long.parseLong(value)); - if (offset + 1 == stateEndOffsets[partition]) - { - log.info("{} - Restoring of state for partition {} done!", id, partition); - stateAssigned(partition); - } - else - { - log.debug( - "{} - Restored state up to offset {}, end-offset: {}, state: {}={}", - id, - offset, - stateEndOffsets[partition], - key, - value); - } } private void handleMessage(