GRÜN: Erwartungen implementiert
[demos/kafka/training] / src / main / java / de / juplo / kafka / WordcountRecordHandler.java
index 5981c7d..4efc547 100644 (file)
@@ -1,37 +1,21 @@
 package de.juplo.kafka;
 
-import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.TopicPartition;
 
-import java.time.Clock;
-import java.time.Duration;
-import java.time.Instant;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.regex.Pattern;
 
 
-@RequiredArgsConstructor
 @Slf4j
 public class WordcountRecordHandler implements RecordHandler<String, String>
 {
   final static Pattern PATTERN = Pattern.compile("\\W+");
 
 
-  private final PartitionStatisticsRepository repository;
-  private final String id;
-  private final String topic;
-  private final Clock clock;
-  private final Duration commitInterval;
-  private final Consumer<String, String> consumer;
-
   private final Map<Integer, Map<String, Map<String, Long>>> seen = new HashMap<>();
 
-  private Instant lastCommit = Instant.EPOCH;
-
 
   @Override
   public void accept(ConsumerRecord<String, String> record)
@@ -62,53 +46,14 @@ public class WordcountRecordHandler implements RecordHandler<String, String>
     }
   }
 
-
-  @Override
-  public void beforeNextPoll()
+  public void addPartition(Integer partition, Map<String, Map<String, Long>> statistics)
   {
-    if (lastCommit.plus(commitInterval).isBefore(clock.instant()))
-    {
-      log.debug("Storing data and offsets, last commit: {}", lastCommit);
-      seen.forEach((partiton, statistics) -> repository.save(
-          new StatisticsDocument(
-              partiton,
-              statistics,
-              consumer.position(new TopicPartition(topic, partiton)))));
-      lastCommit = clock.instant();
-    }
+    seen.put(partition, statistics);
   }
 
-  @Override
-  public void onPartitionAssigned(TopicPartition tp)
-  {
-    Integer partition = tp.partition();
-    Long offset = consumer.position(tp);
-    log.info("{} - adding partition: {}, offset={}", id, partition, offset);
-    StatisticsDocument document =
-        repository
-            .findById(Integer.toString(partition))
-            .orElse(new StatisticsDocument(partition));
-    if (document.offset >= 0)
-    {
-      // Only seek, if a stored offset was found
-      // Otherwise: Use initial offset, generated by Kafka
-      consumer.seek(tp, document.offset);
-    }
-    seen.put(partition, document.statistics);
-  }
-
-  @Override
-  public void onPartitionRevoked(TopicPartition tp)
+  public Map<String, Map<String, Long>> removePartition(Integer partition)
   {
-    Integer partition = tp.partition();
-    Long newOffset = consumer.position(tp);
-    log.info(
-        "{} - removing partition: {}, offset of next message {})",
-        id,
-        partition,
-        newOffset);
-    Map<String, Map<String, Long>> removed = seen.remove(partition);
-    repository.save(new StatisticsDocument(partition, removed, consumer.position(tp)));
+    return seen.remove(partition);
   }