record.value());
             }
 
+            checkRestoreProgress(partition);
+
             done[partition.partition()] = true;
           });
 
     }
     else
     {
-      handleState(partition, offset, key, value);
+      handleState(partition, key, value);
+    }
+  }
+
+  private void checkRestoreProgress(TopicPartition topicPartition)
+  {
+    int partition = topicPartition.partition();
+
+    if (partitionStates[partition] == State.RESTORING)
+    {
+      long consumerPosition = consumer.position(topicPartition);
+
+      if (consumerPosition + 1 >= stateEndOffsets[partition])
+      {
+        log.info(
+          "{} - Position of consumer is {}. Restoring of state for partition {} done!",
+          id,
+          consumerPosition,
+          topicPartition);
+        stateAssigned(partition);
+      }
+      else
+      {
+        log.debug(
+          "{} - Restored state up to offset {}, end-offset: {}",
+          id,
+          consumerPosition,
+          stateEndOffsets[partition]);
+      }
     }
   }
 
   private synchronized void handleState(
     int partition,
-    long offset,
     String key,
     String value)
   {
     restoredState[partition].put(key, Long.parseLong(value));
-    if (offset + 1 == stateEndOffsets[partition])
-    {
-      log.info("{} - Restoring of state for partition {} done!", id, partition);
-      stateAssigned(partition);
-    }
-    else
-    {
-      log.debug(
-        "{} - Restored state up to offset {}, end-offset: {}, state: {}={}",
-        id,
-        offset,
-        stateEndOffsets[partition],
-        key,
-        value);
-    }
   }
 
   private void handleMessage(