public class ApplicationConfiguration
{
@Bean
- public KeyCountingRecordHandler messageCountingRecordHandler()
+ public ApplicationRecordHandler recordHandler()
{
- return new KeyCountingRecordHandler();
+ return new ApplicationRecordHandler();
}
@Bean
- public KeyCountingRebalanceListener wordcountRebalanceListener(
- KeyCountingRecordHandler keyCountingRecordHandler,
- PartitionStatisticsRepository repository,
+ public ApplicationRebalanceListener rebalanceListener(
+ ApplicationRecordHandler recordHandler,
+ StateRepository stateRepository,
Consumer<String, Long> consumer,
ApplicationProperties properties)
{
- return new KeyCountingRebalanceListener(
- keyCountingRecordHandler,
- repository,
+ return new ApplicationRebalanceListener(
+ recordHandler,
+ stateRepository,
properties.getClientId(),
properties.getTopic(),
Clock.systemDefaultZone(),
public EndlessConsumer<String, Long> endlessConsumer(
KafkaConsumer<String, Long> kafkaConsumer,
ExecutorService executor,
- KeyCountingRebalanceListener keyCountingRebalanceListener,
- KeyCountingRecordHandler keyCountingRecordHandler,
+ ApplicationRebalanceListener rebalanceListener,
+ ApplicationRecordHandler recordHandler,
ApplicationProperties properties)
{
return
properties.getClientId(),
properties.getTopic(),
kafkaConsumer,
- keyCountingRebalanceListener,
- keyCountingRecordHandler);
+ rebalanceListener,
+ recordHandler);
}
@Bean
--- /dev/null
+package de.juplo.kafka;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.TopicPartition;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Collection;
+import java.util.Map;
+
+
+@RequiredArgsConstructor
+@Slf4j
+public class ApplicationRebalanceListener implements PollIntervalAwareConsumerRebalanceListener
+{
+ private final ApplicationRecordHandler recordHandler;
+ private final StateRepository stateRepository;
+ private final String id;
+ private final String topic;
+ private final Clock clock;
+ private final Duration commitInterval;
+ private final Consumer<String, Long> consumer;
+
+ private Instant lastCommit = Instant.EPOCH;
+
+ @Override
+ public void onPartitionsAssigned(Collection<TopicPartition> partitions)
+ {
+ partitions.forEach(tp ->
+ {
+ Integer partition = tp.partition();
+ Long offset = consumer.position(tp);
+ log.info("{} - adding partition: {}, offset={}", id, partition, offset);
+ StateDocument document =
+ stateRepository
+ .findById(Integer.toString(partition))
+ .orElse(new StateDocument(partition));
+ if (document.offset >= 0)
+ {
+ // Only seek, if a stored offset was found
+ // Otherwise: Use initial offset, generated by Kafka
+ consumer.seek(tp, document.offset);
+ }
+ recordHandler.addPartition(partition, document.state);
+ });
+ }
+
+ @Override
+ public void onPartitionsRevoked(Collection<TopicPartition> partitions)
+ {
+ partitions.forEach(tp ->
+ {
+ Integer partition = tp.partition();
+ Long newOffset = consumer.position(tp);
+ log.info(
+ "{} - removing partition: {}, offset of next message {})",
+ id,
+ partition,
+ newOffset);
+ Map<String, Long> removed = recordHandler.removePartition(partition);
+ stateRepository.save(new StateDocument(partition, removed, consumer.position(tp)));
+ });
+ }
+
+
+ @Override
+ public void beforeNextPoll()
+ {
+ if (lastCommit.plus(commitInterval).isBefore(clock.instant()))
+ {
+ log.debug("Storing data and offsets, last commit: {}", lastCommit);
+ recordHandler.getState().forEach((partiton, state) -> stateRepository.save(
+ new StateDocument(
+ partiton,
+ state,
+ consumer.position(new TopicPartition(topic, partiton)))));
+ lastCommit = clock.instant();
+ }
+ }
+}
--- /dev/null
+package de.juplo.kafka;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+@Slf4j
+public class ApplicationRecordHandler implements RecordHandler<String, Long>
+{
+ private final Map<Integer, Map<String, Long>> state = new HashMap<>();
+
+
+ @Override
+ public void accept(ConsumerRecord<String, Long> record)
+ {
+ Integer partition = record.partition();
+ String key = record.key() == null ? "NULL" : record.key().toString();
+ Map<String, Long> byKey = state.get(partition);
+
+ if (!byKey.containsKey(key))
+ byKey.put(key, 0l);
+
+ long seenByKey = byKey.get(key);
+ seenByKey++;
+ byKey.put(key, seenByKey);
+ }
+
+ protected void addPartition(Integer partition, Map<String, Long> state)
+ {
+ this.state.put(partition, state);
+ }
+
+ protected Map<String, Long> removePartition(Integer partition)
+ {
+ return this.state.remove(partition);
+ }
+
+
+ public Map<Integer, Map<String, Long>> getState()
+ {
+ return state;
+ }
+}
public class DriverController
{
private final EndlessConsumer consumer;
- private final KeyCountingRecordHandler keyCountingRecordHandler;
+ private final ApplicationRecordHandler recordHandler;
@PostMapping("start")
}
- @GetMapping("seen")
- public Map<Integer, Map<String, Long>> seen()
+ @GetMapping("state")
+ public Map<Integer, Map<String, Long>> state()
{
- return keyCountingRecordHandler.getSeen();
+ return recordHandler.getState();
}
private final String id;
private final String topic;
private final Consumer<K, V> consumer;
- private final PollIntervalAwareConsumerRebalanceListener pollIntervalAwareRebalanceListener;
- private final RecordHandler<K, V> handler;
+ private final PollIntervalAwareConsumerRebalanceListener rebalanceListener;
+ private final RecordHandler<K, V> recordHandler;
private final Lock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
try
{
log.info("{} - Subscribing to topic {}", id, topic);
- consumer.subscribe(Arrays.asList(topic), pollIntervalAwareRebalanceListener);
+ consumer.subscribe(Arrays.asList(topic), rebalanceListener);
while (true)
{
record.value()
);
- handler.accept(record);
+ recordHandler.accept(record);
consumed++;
}
- pollIntervalAwareRebalanceListener.beforeNextPoll();
+ rebalanceListener.beforeNextPoll();
}
}
catch(WakeupException e)
+++ /dev/null
-package de.juplo.kafka;
-
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.common.TopicPartition;
-
-import java.time.Clock;
-import java.time.Duration;
-import java.time.Instant;
-import java.util.Collection;
-import java.util.Map;
-
-
-@RequiredArgsConstructor
-@Slf4j
-public class KeyCountingRebalanceListener implements PollIntervalAwareConsumerRebalanceListener
-{
- private final KeyCountingRecordHandler handler;
- private final PartitionStatisticsRepository repository;
- private final String id;
- private final String topic;
- private final Clock clock;
- private final Duration commitInterval;
- private final Consumer<String, Long> consumer;
-
- private Instant lastCommit = Instant.EPOCH;
-
- @Override
- public void onPartitionsAssigned(Collection<TopicPartition> partitions)
- {
- partitions.forEach(tp ->
- {
- Integer partition = tp.partition();
- Long offset = consumer.position(tp);
- log.info("{} - adding partition: {}, offset={}", id, partition, offset);
- StatisticsDocument document =
- repository
- .findById(Integer.toString(partition))
- .orElse(new StatisticsDocument(partition));
- if (document.offset >= 0)
- {
- // Only seek, if a stored offset was found
- // Otherwise: Use initial offset, generated by Kafka
- consumer.seek(tp, document.offset);
- }
- handler.addPartition(partition, document.statistics);
- });
- }
-
- @Override
- public void onPartitionsRevoked(Collection<TopicPartition> partitions)
- {
- partitions.forEach(tp ->
- {
- Integer partition = tp.partition();
- Long newOffset = consumer.position(tp);
- log.info(
- "{} - removing partition: {}, offset of next message {})",
- id,
- partition,
- newOffset);
- Map<String, Long> removed = handler.removePartition(partition);
- repository.save(new StatisticsDocument(partition, removed, consumer.position(tp)));
- });
- }
-
-
- @Override
- public void beforeNextPoll()
- {
- if (lastCommit.plus(commitInterval).isBefore(clock.instant()))
- {
- log.debug("Storing data and offsets, last commit: {}", lastCommit);
- handler.getSeen().forEach((partiton, statistics) -> repository.save(
- new StatisticsDocument(
- partiton,
- statistics,
- consumer.position(new TopicPartition(topic, partiton)))));
- lastCommit = clock.instant();
- }
- }
-}
+++ /dev/null
-package de.juplo.kafka;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-
-import java.util.HashMap;
-import java.util.Map;
-
-
-@Slf4j
-public class KeyCountingRecordHandler implements RecordHandler<String, Long>
-{
- private final Map<Integer, Map<String, Long>> seen = new HashMap<>();
-
-
- @Override
- public void accept(ConsumerRecord<String, Long> record)
- {
- Integer partition = record.partition();
- String key = record.key() == null ? "NULL" : record.key().toString();
- Map<String, Long> byKey = seen.get(partition);
-
- if (!byKey.containsKey(key))
- byKey.put(key, 0l);
-
- long seenByKey = byKey.get(key);
- seenByKey++;
- byKey.put(key, seenByKey);
- }
-
- public void addPartition(Integer partition, Map<String, Long> statistics)
- {
- seen.put(partition, statistics);
- }
-
- public Map<String, Long> removePartition(Integer partition)
- {
- return seen.remove(partition);
- }
-
-
- public Map<Integer, Map<String, Long>> getSeen()
- {
- return seen;
- }
-}
+++ /dev/null
-package de.juplo.kafka;
-
-import org.springframework.data.mongodb.repository.MongoRepository;
-
-import java.util.Optional;
-
-
-public interface PartitionStatisticsRepository extends MongoRepository<StatisticsDocument, String>
-{
- public Optional<StatisticsDocument> findById(String partition);
-}
--- /dev/null
+package de.juplo.kafka;
+
+import lombok.ToString;
+import org.springframework.data.annotation.Id;
+import org.springframework.data.mongodb.core.mapping.Document;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+@Document(collection = "state")
+@ToString
+public class StateDocument
+{
+ @Id
+ public String id;
+ public long offset = -1l;
+ public Map<String, Long> state;
+
+ public StateDocument()
+ {
+ }
+
+ public StateDocument(Integer partition)
+ {
+ this.id = Integer.toString(partition);
+ this.state = new HashMap<>();
+ }
+
+ public StateDocument(Integer partition, Map<String, Long> state, long offset)
+ {
+ this.id = Integer.toString(partition);
+ this.state = state;
+ this.offset = offset;
+ }
+}
--- /dev/null
+package de.juplo.kafka;
+
+import org.springframework.data.mongodb.repository.MongoRepository;
+
+import java.util.Optional;
+
+
+public interface StateRepository extends MongoRepository<StateDocument, String>
+{
+ public Optional<StateDocument> findById(String partition);
+}
+++ /dev/null
-package de.juplo.kafka;
-
-import lombok.ToString;
-import org.springframework.data.annotation.Id;
-import org.springframework.data.mongodb.core.mapping.Document;
-
-import java.util.HashMap;
-import java.util.Map;
-
-
-@Document(collection = "statistics")
-@ToString
-public class StatisticsDocument
-{
- @Id
- public String id;
- public long offset = -1l;
- public Map<String, Long> statistics;
-
- public StatisticsDocument()
- {
- }
-
- public StatisticsDocument(Integer partition)
- {
- this.id = Integer.toString(partition);
- this.statistics = new HashMap<>();
- }
-
- public StatisticsDocument(Integer partition, Map<String, Long> statistics, long offset)
- {
- this.id = Integer.toString(partition);
- this.statistics = statistics;
- this.offset = offset;
- }
-}
@Autowired
KafkaConsumer<Bytes, Bytes> offsetConsumer;
@Autowired
- PartitionStatisticsRepository partitionStatisticsRepository;
- @Autowired
ApplicationProperties properties;
@Autowired
ExecutorService executor;
@Autowired
- PartitionStatisticsRepository repository;
+ StateRepository stateRepository;
@Autowired
- KeyCountingRebalanceListener keyCountingRebalanceListener;
+ ApplicationRebalanceListener rebalanceListener;
@Autowired
- KeyCountingRecordHandler keyCountingRecordHandler;
+ ApplicationRecordHandler recordHandler;
EndlessConsumer<String, Long> endlessConsumer;
Map<TopicPartition, Long> oldOffsets;
Long offset = offsetConsumer.position(tp);
log.info("New position for {}: {}", tp, offset);
Integer partition = tp.partition();
- StatisticsDocument document =
- partitionStatisticsRepository
+ StateDocument document =
+ stateRepository
.findById(partition.toString())
- .orElse(new StatisticsDocument(partition));
+ .orElse(new StateDocument(partition));
document.offset = offset;
- partitionStatisticsRepository.save(document);
+ stateRepository.save(document);
});
offsetConsumer.unsubscribe();
}
partitions().forEach(tp ->
{
String partition = Integer.toString(tp.partition());
- Optional<Long> offset = partitionStatisticsRepository.findById(partition).map(document -> document.offset);
+ Optional<Long> offset = stateRepository.findById(partition).map(document -> document.offset);
consumer.accept(tp, offset.orElse(0l));
});
}
});
TestRecordHandler<String, Long> captureOffsetAndExecuteTestHandler =
- new TestRecordHandler<String, Long>(keyCountingRecordHandler) {
+ new TestRecordHandler<String, Long>(recordHandler) {
@Override
public void onNewRecord(ConsumerRecord<String, Long> record)
{
properties.getClientId(),
properties.getTopic(),
kafkaConsumer,
- keyCountingRebalanceListener,
+ rebalanceListener,
captureOffsetAndExecuteTestHandler);
endlessConsumer.start();