package de.juplo.kafka;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
+ import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
public class ApplicationConfiguration
{
@Bean
- public Consumer<ConsumerRecord<String, Long>> consumer()
- public WordcountRecordHandler wordcountRecordHandler()
++ public KeyCountingRecordHandler messageCountingRecordHandler()
{
- return (record) ->
- {
- // Handle record
- };
- return new WordcountRecordHandler();
++ return new KeyCountingRecordHandler();
+ }
+
+ @Bean
- public WordcountRebalanceListener wordcountRebalanceListener(
- WordcountRecordHandler wordcountRecordHandler,
++ public KeyCountingRebalanceListener wordcountRebalanceListener(
++ KeyCountingRecordHandler keyCountingRecordHandler,
+ PartitionStatisticsRepository repository,
- Consumer<String, String> consumer,
++ Consumer<String, Long> consumer,
+ ApplicationProperties properties)
+ {
- return new WordcountRebalanceListener(
- wordcountRecordHandler,
++ return new KeyCountingRebalanceListener(
++ keyCountingRecordHandler,
+ repository,
+ properties.getClientId(),
+ properties.getTopic(),
+ Clock.systemDefaultZone(),
+ properties.getCommitInterval(),
+ consumer);
}
@Bean
- public EndlessConsumer<String, String> endlessConsumer(
- KafkaConsumer<String, String> kafkaConsumer,
+ public EndlessConsumer<String, Long> endlessConsumer(
+ KafkaConsumer<String, Long> kafkaConsumer,
ExecutorService executor,
- Consumer<ConsumerRecord<String, Long>> handler,
- PartitionStatisticsRepository repository,
- WordcountRebalanceListener wordcountRebalanceListener,
- WordcountRecordHandler wordcountRecordHandler,
++ KeyCountingRebalanceListener keyCountingRebalanceListener,
++ KeyCountingRecordHandler keyCountingRecordHandler,
ApplicationProperties properties)
{
return
new EndlessConsumer<>(
executor,
- repository,
properties.getClientId(),
properties.getTopic(),
- Clock.systemDefaultZone(),
- properties.getCommitInterval(),
kafkaConsumer,
- handler);
- wordcountRebalanceListener,
- wordcountRecordHandler);
++ keyCountingRebalanceListener,
++ keyCountingRecordHandler);
}
@Bean
import lombok.RequiredArgsConstructor;
import org.springframework.http.HttpStatus;
- import org.springframework.web.bind.annotation.ExceptionHandler;
- import org.springframework.web.bind.annotation.GetMapping;
- import org.springframework.web.bind.annotation.PostMapping;
- import org.springframework.web.bind.annotation.ResponseStatus;
- import org.springframework.web.bind.annotation.RestController;
-import org.springframework.http.ResponseEntity;
+ import org.springframework.web.bind.annotation.*;
import java.util.Map;
import java.util.concurrent.ExecutionException;
public class DriverController
{
private final EndlessConsumer consumer;
- private final WordcountRecordHandler wordcount;
++ private final KeyCountingRecordHandler keyCountingRecordHandler;
@PostMapping("start")
@GetMapping("seen")
- public Map<Integer, Map<String, Map<String, Long>>> seen()
+ public Map<Integer, Map<String, Long>> seen()
{
- return consumer.getSeen();
- return wordcount.getSeen();
- }
-
- @GetMapping("seen/{user}")
- public ResponseEntity<Map<String, Long>> seen(@PathVariable String user)
- {
- for (Map<String, Map<String, Long>> users : wordcount.getSeen().values())
- {
- Map<String, Long> words = users.get(user);
- if (words != null)
- return ResponseEntity.ok(words);
- }
-
- return ResponseEntity.notFound().build();
++ return keyCountingRecordHandler.getSeen();
}
--- /dev/null
--- /dev/null
++package de.juplo.kafka;
++
++import lombok.RequiredArgsConstructor;
++import lombok.extern.slf4j.Slf4j;
++import org.apache.kafka.clients.consumer.Consumer;
++import org.apache.kafka.common.TopicPartition;
++
++import java.time.Clock;
++import java.time.Duration;
++import java.time.Instant;
++import java.util.Collection;
++import java.util.Map;
++
++
++@RequiredArgsConstructor
++@Slf4j
++public class KeyCountingRebalanceListener implements PollIntervalAwareConsumerRebalanceListener
++{
++ private final KeyCountingRecordHandler handler;
++ private final PartitionStatisticsRepository repository;
++ private final String id;
++ private final String topic;
++ private final Clock clock;
++ private final Duration commitInterval;
++ private final Consumer<String, Long> consumer;
++
++ private Instant lastCommit = Instant.EPOCH;
++
++ @Override
++ public void onPartitionsAssigned(Collection<TopicPartition> partitions)
++ {
++ partitions.forEach(tp ->
++ {
++ Integer partition = tp.partition();
++ Long offset = consumer.position(tp);
++ log.info("{} - adding partition: {}, offset={}", id, partition, offset);
++ StatisticsDocument document =
++ repository
++ .findById(Integer.toString(partition))
++ .orElse(new StatisticsDocument(partition));
++ if (document.offset >= 0)
++ {
++ // Only seek, if a stored offset was found
++ // Otherwise: Use initial offset, generated by Kafka
++ consumer.seek(tp, document.offset);
++ }
++ handler.addPartition(partition, document.statistics);
++ });
++ }
++
++ @Override
++ public void onPartitionsRevoked(Collection<TopicPartition> partitions)
++ {
++ partitions.forEach(tp ->
++ {
++ Integer partition = tp.partition();
++ Long newOffset = consumer.position(tp);
++ log.info(
++ "{} - removing partition: {}, offset of next message {})",
++ id,
++ partition,
++ newOffset);
++ Map<String, Long> removed = handler.removePartition(partition);
++ repository.save(new StatisticsDocument(partition, removed, consumer.position(tp)));
++ });
++ }
++
++
++ @Override
++ public void beforeNextPoll()
++ {
++ if (lastCommit.plus(commitInterval).isBefore(clock.instant()))
++ {
++ log.debug("Storing data and offsets, last commit: {}", lastCommit);
++ handler.getSeen().forEach((partiton, statistics) -> repository.save(
++ new StatisticsDocument(
++ partiton,
++ statistics,
++ consumer.position(new TopicPartition(topic, partiton)))));
++ lastCommit = clock.instant();
++ }
++ }
++}
--- /dev/null
--- /dev/null
++package de.juplo.kafka;
++
++import lombok.extern.slf4j.Slf4j;
++import org.apache.kafka.clients.consumer.ConsumerRecord;
++
++import java.util.HashMap;
++import java.util.Map;
++
++
++@Slf4j
++public class KeyCountingRecordHandler implements RecordHandler<String, Long>
++{
++ private final Map<Integer, Map<String, Long>> seen = new HashMap<>();
++
++
++ @Override
++ public void accept(ConsumerRecord<String, Long> record)
++ {
++ Integer partition = record.partition();
++ String key = record.key() == null ? "NULL" : record.key().toString();
++ Map<String, Long> byKey = seen.get(partition);
++
++ if (!byKey.containsKey(key))
++ byKey.put(key, 0l);
++
++ long seenByKey = byKey.get(key);
++ seenByKey++;
++ byKey.put(key, seenByKey);
++ }
++
++ public void addPartition(Integer partition, Map<String, Long> statistics)
++ {
++ seen.put(partition, statistics);
++ }
++
++ public Map<String, Long> removePartition(Integer partition)
++ {
++ return seen.remove(partition);
++ }
++
++
++ public Map<Integer, Map<String, Long>> getSeen()
++ {
++ return seen;
++ }
++}
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
--import java.time.Clock;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ExecutionException;
ExecutorService executor;
@Autowired
PartitionStatisticsRepository repository;
- WordcountRebalanceListener wordcountRebalanceListener;
+ @Autowired
- WordcountRecordHandler wordcountRecordHandler;
++ KeyCountingRebalanceListener keyCountingRebalanceListener;
+ @Autowired
++ KeyCountingRecordHandler keyCountingRecordHandler;
- Consumer<ConsumerRecord<String, Long>> testHandler;
- EndlessConsumer<String, String> endlessConsumer;
+ EndlessConsumer<String, Long> endlessConsumer;
Map<TopicPartition, Long> oldOffsets;
Map<TopicPartition, Long> newOffsets;
- Set<ConsumerRecord<String, String>> receivedRecords;
+ Set<ConsumerRecord<String, Long>> receivedRecords;
/** Tests methods */
Optional<Long> offset = partitionStatisticsRepository.findById(partition).map(document -> document.offset);
consumer.accept(tp, offset.orElse(0l));
});
-- }
++ }
List<TopicPartition> partitions()
{
newOffsets.put(tp, offset - 1);
});
- Consumer<ConsumerRecord<String, Long>> captureOffsetAndExecuteTestHandler =
- record ->
- {
- newOffsets.put(
- new TopicPartition(record.topic(), record.partition()),
- record.offset());
- receivedRecords.add(record);
- testHandler.accept(record);
- TestRecordHandler<String, String> captureOffsetAndExecuteTestHandler =
- new TestRecordHandler<String, String>(wordcountRecordHandler) {
++ TestRecordHandler<String, Long> captureOffsetAndExecuteTestHandler =
++ new TestRecordHandler<String, Long>(keyCountingRecordHandler) {
+ @Override
- public void onNewRecord(ConsumerRecord<String, String> record)
++ public void onNewRecord(ConsumerRecord<String, Long> record)
+ {
+ newOffsets.put(
+ new TopicPartition(record.topic(), record.partition()),
+ record.offset());
+ receivedRecords.add(record);
+ }
};
endlessConsumer =
new EndlessConsumer<>(
executor,
- repository,
properties.getClientId(),
properties.getTopic(),
- Clock.systemDefaultZone(),
- properties.getCommitInterval(),
kafkaConsumer,
- wordcountRebalanceListener,
++ keyCountingRebalanceListener,
captureOffsetAndExecuteTestHandler);
endlessConsumer.start();