private boolean running = false;
private Exception exception;
private long consumed = 0;
- private KafkaConsumer<String, String> consumer = null;
+ private final Map<Integer, Map<String, Long>> seen = new HashMap<>();
+ private final Map<Integer, Long> offsets = new HashMap<>();
- private final Map<Integer, Map<String, Integer>> seen = new HashMap<>();
- repository.save(new StatisticsDocument(partition, removed));
+ @Override
+ public void onPartitionsRevoked(Collection<TopicPartition> partitions)
+ {
+ partitions.forEach(tp ->
+ {
+ Integer partition = tp.partition();
+ Long newOffset = consumer.position(tp);
+ Long oldOffset = offsets.remove(partition);
+ log.info(
+ "{} - removing partition: {}, consumed {} records (offset {} -> {})",
+ id,
+ partition,
+ newOffset - oldOffset,
+ oldOffset,
+ newOffset);
+ Map<String, Long> removed = seen.remove(partition);
+ for (String key : removed.keySet())
+ {
+ log.info(
+ "{} - Seen {} messages for partition={}|key={}",
+ id,
+ removed.get(key),
+ partition,
+ key);
+ }
++ repository.save(new StatisticsDocument(partition, removed, consumer.position(tp)));
+ });
+ }
- public EndlessConsumer(
- ExecutorService executor,
- PartitionStatisticsRepository repository,
- String bootstrapServer,
- String groupId,
- String clientId,
- String topic,
- String autoOffsetReset)
+ @Override
+ public void onPartitionsAssigned(Collection<TopicPartition> partitions)
{
- this.executor = executor;
- this.repository = repository;
- this.bootstrapServer = bootstrapServer;
- this.groupId = groupId;
- this.id = clientId;
- this.topic = topic;
- this.autoOffsetReset = autoOffsetReset;
+ partitions.forEach(tp ->
+ {
+ Integer partition = tp.partition();
+ Long offset = consumer.position(tp);
+ log.info("{} - adding partition: {}, offset={}", id, partition, offset);
- offsets.put(partition, offset);
- seen.put(
- partition,
++ StatisticsDocument document =
+ repository
- .findById(Integer.toString(tp.partition()))
- .map(document -> document.statistics)
- .orElse(new HashMap<>()));
++ .findById(Integer.toString(partition))
++ .orElse(new StatisticsDocument(partition));
++ consumer.seek(tp, document.offset);
++ seen.put(partition, document.statistics);
+ });
}
+
@Override
public void run()
{
{
@Id
public String id;
- public Map<String, Integer> statistics;
+ public long offset;
+ public Map<String, Long> statistics;
public StatisticsDocument()
{
}
- public StatisticsDocument(Integer partition, Map<String, Long> statistics)
+ public StatisticsDocument(Integer partition)
+ {
+ this.id = Integer.toString(partition);
+ this.statistics = new HashMap<>();
+ }
+
- public StatisticsDocument(Integer partition, Map<String, Integer> statistics, long offset)
++ public StatisticsDocument(Integer partition, Map<String, Long> statistics, long offset)
{
this.id = Integer.toString(partition);
this.statistics = statistics;