refactor: Alle Kafka-Belange in den `WordcountRebalanceListener` verschoben
[demos/kafka/training] / src / main / java / de / juplo / kafka / WordcountRecordHandler.java
index bdf4b32..4efc547 100644 (file)
@@ -1,36 +1,21 @@
 package de.juplo.kafka;
 
-import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.TopicPartition;
 
-import java.time.Clock;
-import java.time.Duration;
-import java.time.Instant;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.regex.Pattern;
 
 
-@RequiredArgsConstructor
 @Slf4j
 public class WordcountRecordHandler implements RecordHandler<String, String>
 {
   final static Pattern PATTERN = Pattern.compile("\\W+");
 
 
-  private final PartitionStatisticsRepository repository;
-  private final String topic;
-  private final Clock clock;
-  private final Duration commitInterval;
-  private final Consumer<String, String> consumer;
-
   private final Map<Integer, Map<String, Map<String, Long>>> seen = new HashMap<>();
 
-  private Instant lastCommit = Instant.EPOCH;
-
 
   @Override
   public void accept(ConsumerRecord<String, String> record)
@@ -61,22 +46,6 @@ public class WordcountRecordHandler implements RecordHandler<String, String>
     }
   }
 
-
-  @Override
-  public void beforeNextPoll()
-  {
-    if (lastCommit.plus(commitInterval).isBefore(clock.instant()))
-    {
-      log.debug("Storing data and offsets, last commit: {}", lastCommit);
-      seen.forEach((partiton, statistics) -> repository.save(
-          new StatisticsDocument(
-              partiton,
-              statistics,
-              consumer.position(new TopicPartition(topic, partiton)))));
-      lastCommit = clock.instant();
-    }
-  }
-
   public void addPartition(Integer partition, Map<String, Map<String, Long>> statistics)
   {
     seen.put(partition, statistics);