TX-kompatibler Weg zur Prüfung auf eine abgeschlossene Wiederherstellung
authorKai Moritz <kai@juplo.de>
Sat, 2 Nov 2024 19:38:04 +0000 (20:38 +0100)
committerKai Moritz <kai@juplo.de>
Sun, 10 Nov 2024 13:27:50 +0000 (14:27 +0100)
* Der bisher verwendete Vergleich der Offset-Positionen schlägt fehl, wenn
  die Implementierung um Transaktionen erweitert wird
* _Grund:_ Dann stimmt die Offset-Position nicht mehr überein, weil nach
  der letzten Zustands-Nachricht noch eine, von der Transaktion erzeugte,
  versteckte Nachricht folgt, die die Anwendung nie zu sehen bekommt!

src/main/java/de/juplo/kafka/ExampleConsumer.java

index a0a91b5..fa2ff81 100644 (file)
@@ -115,6 +115,8 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
                 record.value());
             }
 
+            checkRestoreProgress(partition);
+
             done[partition.partition()] = true;
           });
 
@@ -167,32 +169,44 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
     }
     else
     {
-      handleState(partition, offset, key, value);
+      handleState(partition, key, value);
+    }
+  }
+
+  private void checkRestoreProgress(TopicPartition topicPartition)
+  {
+    int partition = topicPartition.partition();
+
+    if (partitionStates[partition] == State.RESTORING)
+    {
+      long consumerPosition = consumer.position(topicPartition);
+
+      if (consumerPosition + 1 >= stateEndOffsets[partition])
+      {
+        log.info(
+          "{} - Position of consumer is {}. Restoring of state for partition {} done!",
+          id,
+          consumerPosition,
+          topicPartition);
+        stateAssigned(partition);
+      }
+      else
+      {
+        log.debug(
+          "{} - Restored state up to offset {}, end-offset: {}",
+          id,
+          consumerPosition,
+          stateEndOffsets[partition]);
+      }
     }
   }
 
   private synchronized void handleState(
     int partition,
-    long offset,
     String key,
     String value)
   {
     restoredState[partition].put(key, Long.parseLong(value));
-    if (offset + 1 == stateEndOffsets[partition])
-    {
-      log.info("{} - Restoring of state for partition {} done!", id, partition);
-      stateAssigned(partition);
-    }
-    else
-    {
-      log.debug(
-        "{} - Restored state up to offset {}, end-offset: {}, state: {}={}",
-        id,
-        offset,
-        stateEndOffsets[partition],
-        key,
-        value);
-    }
   }
 
   private void handleMessage(