refactor: Handling der Partitionen in WordcountRebalanceListener
[demos/kafka/training] / src / main / java / de / juplo / kafka / WordcountRecordHandler.java
1 package de.juplo.kafka;
2
3 import lombok.RequiredArgsConstructor;
4 import lombok.extern.slf4j.Slf4j;
5 import org.apache.kafka.clients.consumer.Consumer;
6 import org.apache.kafka.clients.consumer.ConsumerRecord;
7 import org.apache.kafka.common.TopicPartition;
8
9 import java.time.Clock;
10 import java.time.Duration;
11 import java.time.Instant;
12 import java.util.HashMap;
13 import java.util.Map;
14 import java.util.regex.Pattern;
15
16
17 @RequiredArgsConstructor
18 @Slf4j
19 public class WordcountRecordHandler implements RecordHandler<String, String>
20 {
21   final static Pattern PATTERN = Pattern.compile("\\W+");
22
23
24   private final PartitionStatisticsRepository repository;
25   private final String topic;
26   private final Clock clock;
27   private final Duration commitInterval;
28   private final Consumer<String, String> consumer;
29
30   private final Map<Integer, Map<String, Map<String, Long>>> seen = new HashMap<>();
31
32   private Instant lastCommit = Instant.EPOCH;
33
34
35   @Override
36   public void accept(ConsumerRecord<String, String> record)
37   {
38     Integer partition = record.partition();
39     String user = record.key();
40     Map<String, Map<String, Long>> users = seen.get(partition);
41
42     Map<String, Long> words = users.get(user);
43     if (words == null)
44     {
45       words = new HashMap<>();
46       users.put(user, words);
47     }
48
49     for (String word : PATTERN.split(record.value()))
50     {
51       Long num = words.get(word);
52       if (num == null)
53       {
54         num = 1l;
55       }
56       else
57       {
58         num++;
59       }
60       words.put(word, num);
61     }
62   }
63
64
65   @Override
66   public void beforeNextPoll()
67   {
68     if (lastCommit.plus(commitInterval).isBefore(clock.instant()))
69     {
70       log.debug("Storing data and offsets, last commit: {}", lastCommit);
71       seen.forEach((partiton, statistics) -> repository.save(
72           new StatisticsDocument(
73               partiton,
74               statistics,
75               consumer.position(new TopicPartition(topic, partiton)))));
76       lastCommit = clock.instant();
77     }
78   }
79
80   public void addPartition(Integer partition, Map<String, Map<String, Long>> statistics)
81   {
82     seen.put(partition, statistics);
83   }
84
85   public Map<String, Map<String, Long>> removePartition(Integer partition)
86   {
87     return seen.remove(partition);
88   }
89
90
91   public Map<Integer, Map<String, Map<String, Long>>> getSeen()
92   {
93     return seen;
94   }
95 }