From bf40c8cf3bdf45f9969747c7a6285b2cbe034299 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 2 Nov 2024 20:38:04 +0100 Subject: [PATCH] =?utf8?q?TX-kompatibler=20Weg=20zur=20Pr=C3=BCfung=20auf?= =?utf8?q?=20eine=20abgeschlossene=20Wiederherstellung?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Der bisher verwendete Vergleich der Offset-Positionen schlägt fehl, wenn die Implementierung um Transaktionen erweitert wird * _Grund:_ Dann stimmt die Offset-Position nicht mehr überein, weil nach der letzten Zustands-Nachricht noch eine, von der Transaktion erzeugte, versteckte Nachricht folgt, die die Anwendung nie zu sehen bekommt! --- .../java/de/juplo/kafka/ExampleConsumer.java | 48 ++++++++++++------- 1 file changed, 31 insertions(+), 17 deletions(-) diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index a0a91b5..fa2ff81 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -115,6 +115,8 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener record.value()); } + checkRestoreProgress(partition); + done[partition.partition()] = true; }); @@ -167,32 +169,44 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener } else { - handleState(partition, offset, key, value); + handleState(partition, key, value); + } + } + + private void checkRestoreProgress(TopicPartition topicPartition) + { + int partition = topicPartition.partition(); + + if (partitionStates[partition] == State.RESTORING) + { + long consumerPosition = consumer.position(topicPartition); + + if (consumerPosition + 1 >= stateEndOffsets[partition]) + { + log.info( + "{} - Position of consumer is {}. Restoring of state for partition {} done!", + id, + consumerPosition, + topicPartition); + stateAssigned(partition); + } + else + { + log.debug( + "{} - Restored state up to offset {}, end-offset: {}", + id, + consumerPosition, + stateEndOffsets[partition]); + } } } private synchronized void handleState( int partition, - long offset, String key, String value) { restoredState[partition].put(key, Long.parseLong(value)); - if (offset + 1 == stateEndOffsets[partition]) - { - log.info("{} - Restoring of state for partition {} done!", id, partition); - stateAssigned(partition); - } - else - { - log.debug( - "{} - Restored state up to offset {}, end-offset: {}, state: {}={}", - id, - offset, - stateEndOffsets[partition], - key, - value); - } } private void handleMessage( -- 2.20.1