1 package de.juplo.kafka;
3 import lombok.RequiredArgsConstructor;
4 import lombok.extern.slf4j.Slf4j;
5 import org.apache.kafka.clients.consumer.Consumer;
6 import org.apache.kafka.clients.consumer.ConsumerRecord;
7 import org.apache.kafka.common.TopicPartition;
9 import java.time.Clock;
10 import java.time.Duration;
11 import java.time.Instant;
12 import java.util.HashMap;
14 import java.util.regex.Pattern;
17 @RequiredArgsConstructor
19 public class WordcountRecordHandler implements RecordHandler<String, String>
21 final static Pattern PATTERN = Pattern.compile("\\W+");
24 private final PartitionStatisticsRepository repository;
25 private final String id;
26 private final String topic;
27 private final Clock clock;
28 private final Duration commitInterval;
29 private final Consumer<String, String> consumer;
31 private final Map<Integer, Map<String, Map<String, Long>>> seen = new HashMap<>();
33 private Instant lastCommit = Instant.EPOCH;
37 public void accept(ConsumerRecord<String, String> record)
39 Integer partition = record.partition();
40 String user = record.key();
41 Map<String, Map<String, Long>> users = seen.get(partition);
43 Map<String, Long> words = users.get(user);
46 words = new HashMap<>();
47 users.put(user, words);
50 for (String word : PATTERN.split(record.value()))
52 Long num = words.get(word);
67 public void beforeNextPoll()
69 if (lastCommit.plus(commitInterval).isBefore(clock.instant()))
71 log.debug("Storing data and offsets, last commit: {}", lastCommit);
72 seen.forEach((partiton, statistics) -> repository.save(
73 new StatisticsDocument(
76 consumer.position(new TopicPartition(topic, partiton)))));
77 lastCommit = clock.instant();
82 public void onPartitionAssigned(TopicPartition tp)
84 Integer partition = tp.partition();
85 Long offset = consumer.position(tp);
86 log.info("{} - adding partition: {}, offset={}", id, partition, offset);
87 StatisticsDocument document =
89 .findById(Integer.toString(partition))
90 .orElse(new StatisticsDocument(partition));
91 if (document.offset >= 0)
93 // Only seek, if a stored offset was found
94 // Otherwise: Use initial offset, generated by Kafka
95 consumer.seek(tp, document.offset);
97 seen.put(partition, document.statistics);
101 public void onPartitionRevoked(TopicPartition tp)
103 Integer partition = tp.partition();
104 Long newOffset = consumer.position(tp);
106 "{} - removing partition: {}, offset of next message {})",
110 Map<String, Map<String, Long>> removed = seen.remove(partition);
111 repository.save(new StatisticsDocument(partition, removed, consumer.position(tp)));
115 public Map<Integer, Map<String, Map<String, Long>>> getSeen()