1 package de.juplo.kafka;
3 import lombok.RequiredArgsConstructor;
4 import lombok.extern.slf4j.Slf4j;
5 import org.apache.kafka.clients.consumer.Consumer;
6 import org.apache.kafka.clients.consumer.ConsumerRecord;
7 import org.apache.kafka.common.TopicPartition;
9 import java.time.Clock;
10 import java.time.Duration;
11 import java.time.Instant;
12 import java.util.HashMap;
14 import java.util.regex.Pattern;
17 @RequiredArgsConstructor
19 public class WordcountRecordHandler implements RecordHandler<String, String>
21 final static Pattern PATTERN = Pattern.compile("\\W+");
24 private final PartitionStatisticsRepository repository;
25 private final String topic;
26 private final Clock clock;
27 private final Duration commitInterval;
28 private final Consumer<String, String> consumer;
30 private final Map<Integer, Map<String, Map<String, Long>>> seen = new HashMap<>();
32 private Instant lastCommit = Instant.EPOCH;
36 public void accept(ConsumerRecord<String, String> record)
38 Integer partition = record.partition();
39 String user = record.key();
40 Map<String, Map<String, Long>> users = seen.get(partition);
42 Map<String, Long> words = users.get(user);
45 words = new HashMap<>();
46 users.put(user, words);
49 for (String word : PATTERN.split(record.value()))
51 Long num = words.get(word);
66 public void beforeNextPoll()
68 if (lastCommit.plus(commitInterval).isBefore(clock.instant()))
70 log.debug("Storing data and offsets, last commit: {}", lastCommit);
71 seen.forEach((partiton, statistics) -> repository.save(
72 new StatisticsDocument(
75 consumer.position(new TopicPartition(topic, partiton)))));
76 lastCommit = clock.instant();
80 public void addPartition(Integer partition, Map<String, Map<String, Long>> statistics)
82 seen.put(partition, statistics);
85 public Map<String, Map<String, Long>> removePartition(Integer partition)
87 return seen.remove(partition);
91 public Map<Integer, Map<String, Map<String, Long>>> getSeen()