public class ApplicationConfiguration
{
@Bean
- public WordcountRecordHandler wordcountRecordHandler()
+ public SumRecordHandler sumRecordHandler()
{
- return new WordcountRecordHandler();
+ return new SumRecordHandler();
}
@Bean
- public WordcountRebalanceListener wordcountRebalanceListener(
- WordcountRecordHandler wordcountRecordHandler,
+ public SumRebalanceListener sumRebalanceListener(
+ SumRecordHandler sumRecordHandler,
PartitionStatisticsRepository repository,
Consumer<String, String> consumer,
ApplicationProperties properties)
{
- return new WordcountRebalanceListener(
- wordcountRecordHandler,
+ return new SumRebalanceListener(
+ sumRecordHandler,
repository,
properties.getClientId(),
properties.getTopic(),
public EndlessConsumer<String, String> endlessConsumer(
KafkaConsumer<String, String> kafkaConsumer,
ExecutorService executor,
- WordcountRebalanceListener wordcountRebalanceListener,
- WordcountRecordHandler wordcountRecordHandler,
+ SumRebalanceListener sumRebalanceListener,
+ SumRecordHandler sumRecordHandler,
ApplicationProperties properties)
{
return
properties.getClientId(),
properties.getTopic(),
kafkaConsumer,
- wordcountRebalanceListener,
- wordcountRecordHandler);
+ sumRebalanceListener,
+ sumRecordHandler);
}
@Bean
public class DriverController
{
private final EndlessConsumer consumer;
- private final WordcountRecordHandler wordcount;
+ private final SumRecordHandler wordcount;
@PostMapping("start")
--- /dev/null
+package de.juplo.kafka;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.TopicPartition;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Collection;
+import java.util.Map;
+
+
+@RequiredArgsConstructor
+@Slf4j
+public class SumRebalanceListener implements PollIntervalAwareConsumerRebalanceListener
+{
+ private final SumRecordHandler handler;
+ private final PartitionStatisticsRepository repository;
+ private final String id;
+ private final String topic;
+ private final Clock clock;
+ private final Duration commitInterval;
+ private final Consumer<String, String> consumer;
+
+ private Instant lastCommit = Instant.EPOCH;
+
+ @Override
+ public void onPartitionsAssigned(Collection<TopicPartition> partitions)
+ {
+ partitions.forEach(tp ->
+ {
+ Integer partition = tp.partition();
+ Long offset = consumer.position(tp);
+ log.info("{} - adding partition: {}, offset={}", id, partition, offset);
+ StatisticsDocument document =
+ repository
+ .findById(Integer.toString(partition))
+ .orElse(new StatisticsDocument(partition));
+ if (document.offset >= 0)
+ {
+ // Only seek, if a stored offset was found
+ // Otherwise: Use initial offset, generated by Kafka
+ consumer.seek(tp, document.offset);
+ }
+ handler.addPartition(partition, document.statistics);
+ });
+ }
+
+ @Override
+ public void onPartitionsRevoked(Collection<TopicPartition> partitions)
+ {
+ partitions.forEach(tp ->
+ {
+ Integer partition = tp.partition();
+ Long newOffset = consumer.position(tp);
+ log.info(
+ "{} - removing partition: {}, offset of next message {})",
+ id,
+ partition,
+ newOffset);
+ Map<String, Map<String, Long>> removed = handler.removePartition(partition);
+ repository.save(new StatisticsDocument(partition, removed, consumer.position(tp)));
+ });
+ }
+
+
+ @Override
+ public void beforeNextPoll()
+ {
+ if (lastCommit.plus(commitInterval).isBefore(clock.instant()))
+ {
+ log.debug("Storing data and offsets, last commit: {}", lastCommit);
+ handler.getSeen().forEach((partiton, statistics) -> repository.save(
+ new StatisticsDocument(
+ partiton,
+ statistics,
+ consumer.position(new TopicPartition(topic, partiton)))));
+ lastCommit = clock.instant();
+ }
+ }
+}
--- /dev/null
+package de.juplo.kafka;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+
+@Slf4j
+public class SumRecordHandler implements RecordHandler<String, String>
+{
+ final static Pattern PATTERN = Pattern.compile("\\W+");
+
+
+ private final Map<Integer, Map<String, Map<String, Long>>> seen = new HashMap<>();
+
+
+ @Override
+ public void accept(ConsumerRecord<String, String> record)
+ {
+ Integer partition = record.partition();
+ String user = record.key();
+ Map<String, Map<String, Long>> users = seen.get(partition);
+
+ Map<String, Long> words = users.get(user);
+ if (words == null)
+ {
+ words = new HashMap<>();
+ users.put(user, words);
+ }
+
+ for (String word : PATTERN.split(record.value()))
+ {
+ Long num = words.get(word);
+ if (num == null)
+ {
+ num = 1l;
+ }
+ else
+ {
+ num++;
+ }
+ words.put(word, num);
+ }
+ }
+
+ public void addPartition(Integer partition, Map<String, Map<String, Long>> statistics)
+ {
+ seen.put(partition, statistics);
+ }
+
+ public Map<String, Map<String, Long>> removePartition(Integer partition)
+ {
+ return seen.remove(partition);
+ }
+
+
+ public Map<Integer, Map<String, Map<String, Long>>> getSeen()
+ {
+ return seen;
+ }
+}
+++ /dev/null
-package de.juplo.kafka;
-
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.common.TopicPartition;
-
-import java.time.Clock;
-import java.time.Duration;
-import java.time.Instant;
-import java.util.Collection;
-import java.util.Map;
-
-
-@RequiredArgsConstructor
-@Slf4j
-public class WordcountRebalanceListener implements PollIntervalAwareConsumerRebalanceListener
-{
- private final WordcountRecordHandler handler;
- private final PartitionStatisticsRepository repository;
- private final String id;
- private final String topic;
- private final Clock clock;
- private final Duration commitInterval;
- private final Consumer<String, String> consumer;
-
- private Instant lastCommit = Instant.EPOCH;
-
- @Override
- public void onPartitionsAssigned(Collection<TopicPartition> partitions)
- {
- partitions.forEach(tp ->
- {
- Integer partition = tp.partition();
- Long offset = consumer.position(tp);
- log.info("{} - adding partition: {}, offset={}", id, partition, offset);
- StatisticsDocument document =
- repository
- .findById(Integer.toString(partition))
- .orElse(new StatisticsDocument(partition));
- if (document.offset >= 0)
- {
- // Only seek, if a stored offset was found
- // Otherwise: Use initial offset, generated by Kafka
- consumer.seek(tp, document.offset);
- }
- handler.addPartition(partition, document.statistics);
- });
- }
-
- @Override
- public void onPartitionsRevoked(Collection<TopicPartition> partitions)
- {
- partitions.forEach(tp ->
- {
- Integer partition = tp.partition();
- Long newOffset = consumer.position(tp);
- log.info(
- "{} - removing partition: {}, offset of next message {})",
- id,
- partition,
- newOffset);
- Map<String, Map<String, Long>> removed = handler.removePartition(partition);
- repository.save(new StatisticsDocument(partition, removed, consumer.position(tp)));
- });
- }
-
-
- @Override
- public void beforeNextPoll()
- {
- if (lastCommit.plus(commitInterval).isBefore(clock.instant()))
- {
- log.debug("Storing data and offsets, last commit: {}", lastCommit);
- handler.getSeen().forEach((partiton, statistics) -> repository.save(
- new StatisticsDocument(
- partiton,
- statistics,
- consumer.position(new TopicPartition(topic, partiton)))));
- lastCommit = clock.instant();
- }
- }
-}
+++ /dev/null
-package de.juplo.kafka;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.regex.Pattern;
-
-
-@Slf4j
-public class WordcountRecordHandler implements RecordHandler<String, String>
-{
- final static Pattern PATTERN = Pattern.compile("\\W+");
-
-
- private final Map<Integer, Map<String, Map<String, Long>>> seen = new HashMap<>();
-
-
- @Override
- public void accept(ConsumerRecord<String, String> record)
- {
- Integer partition = record.partition();
- String user = record.key();
- Map<String, Map<String, Long>> users = seen.get(partition);
-
- Map<String, Long> words = users.get(user);
- if (words == null)
- {
- words = new HashMap<>();
- users.put(user, words);
- }
-
- for (String word : PATTERN.split(record.value()))
- {
- Long num = words.get(word);
- if (num == null)
- {
- num = 1l;
- }
- else
- {
- num++;
- }
- words.put(word, num);
- }
- }
-
- public void addPartition(Integer partition, Map<String, Map<String, Long>> statistics)
- {
- seen.put(partition, statistics);
- }
-
- public Map<String, Map<String, Long>> removePartition(Integer partition)
- {
- return seen.remove(partition);
- }
-
-
- public Map<Integer, Map<String, Map<String, Long>>> getSeen()
- {
- return seen;
- }
-}
@Autowired
PartitionStatisticsRepository repository;
@Autowired
- WordcountRebalanceListener wordcountRebalanceListener;
+ SumRebalanceListener sumRebalanceListener;
@Autowired
- WordcountRecordHandler wordcountRecordHandler;
+ SumRecordHandler sumRecordHandler;
EndlessConsumer<String, String> endlessConsumer;
Map<TopicPartition, Long> oldOffsets;
});
TestRecordHandler<String, String> captureOffsetAndExecuteTestHandler =
- new TestRecordHandler<String, String>(wordcountRecordHandler) {
+ new TestRecordHandler<String, String>(sumRecordHandler) {
@Override
public void onNewRecord(ConsumerRecord<String, String> record)
{
properties.getClientId(),
properties.getTopic(),
kafkaConsumer,
- wordcountRebalanceListener,
+ sumRebalanceListener,
captureOffsetAndExecuteTestHandler);
endlessConsumer.start();