X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FWordcountRecordHandler.java;h=4efc54726539c61c017d336ae07957c1b84fdefb;hb=0b6377951bbeeb2443415bc2aa9c68988ee4c5af;hp=bdf4b328c048171f3124e3e24c4454c60166da25;hpb=fc682d9890787ef363b3e189f6f880a043f3c541;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/WordcountRecordHandler.java b/src/main/java/de/juplo/kafka/WordcountRecordHandler.java index bdf4b32..4efc547 100644 --- a/src/main/java/de/juplo/kafka/WordcountRecordHandler.java +++ b/src/main/java/de/juplo/kafka/WordcountRecordHandler.java @@ -1,36 +1,21 @@ package de.juplo.kafka; -import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.common.TopicPartition; -import java.time.Clock; -import java.time.Duration; -import java.time.Instant; import java.util.HashMap; import java.util.Map; import java.util.regex.Pattern; -@RequiredArgsConstructor @Slf4j public class WordcountRecordHandler implements RecordHandler { final static Pattern PATTERN = Pattern.compile("\\W+"); - private final PartitionStatisticsRepository repository; - private final String topic; - private final Clock clock; - private final Duration commitInterval; - private final Consumer consumer; - private final Map>> seen = new HashMap<>(); - private Instant lastCommit = Instant.EPOCH; - @Override public void accept(ConsumerRecord record) @@ -61,22 +46,6 @@ public class WordcountRecordHandler implements RecordHandler } } - - @Override - public void beforeNextPoll() - { - if (lastCommit.plus(commitInterval).isBefore(clock.instant())) - { - log.debug("Storing data and offsets, last commit: {}", lastCommit); - seen.forEach((partiton, statistics) -> repository.save( - new StatisticsDocument( - partiton, - statistics, - consumer.position(new TopicPartition(topic, partiton))))); - lastCommit = clock.instant(); - } - } - public void addPartition(Integer partition, Map> statistics) { seen.put(partition, statistics);