refactor: Handling der Partitionen in WordcountRebalanceListener
[demos/kafka/training] / src / main / java / de / juplo / kafka / WordcountRecordHandler.java
index 5981c7d..bdf4b32 100644 (file)
@@ -22,7 +22,6 @@ public class WordcountRecordHandler implements RecordHandler<String, String>
 
 
   private final PartitionStatisticsRepository repository;
-  private final String id;
   private final String topic;
   private final Clock clock;
   private final Duration commitInterval;
@@ -78,37 +77,14 @@ public class WordcountRecordHandler implements RecordHandler<String, String>
     }
   }
 
-  @Override
-  public void onPartitionAssigned(TopicPartition tp)
+  public void addPartition(Integer partition, Map<String, Map<String, Long>> statistics)
   {
-    Integer partition = tp.partition();
-    Long offset = consumer.position(tp);
-    log.info("{} - adding partition: {}, offset={}", id, partition, offset);
-    StatisticsDocument document =
-        repository
-            .findById(Integer.toString(partition))
-            .orElse(new StatisticsDocument(partition));
-    if (document.offset >= 0)
-    {
-      // Only seek, if a stored offset was found
-      // Otherwise: Use initial offset, generated by Kafka
-      consumer.seek(tp, document.offset);
-    }
-    seen.put(partition, document.statistics);
+    seen.put(partition, statistics);
   }
 
-  @Override
-  public void onPartitionRevoked(TopicPartition tp)
+  public Map<String, Map<String, Long>> removePartition(Integer partition)
   {
-    Integer partition = tp.partition();
-    Long newOffset = consumer.position(tp);
-    log.info(
-        "{} - removing partition: {}, offset of next message {})",
-        id,
-        partition,
-        newOffset);
-    Map<String, Map<String, Long>> removed = seen.remove(partition);
-    repository.save(new StatisticsDocument(partition, removed, consumer.position(tp)));
+    return seen.remove(partition);
   }