public class EndlessConsumer<K, V> implements ConsumerRebalanceListener, Runnable
{
private final ExecutorService executor;
+ private final PartitionStatisticsRepository repository;
private final String id;
private final String topic;
private final Consumer<K, V> consumer;
partition,
key);
}
+ repository.save(new StatisticsDocument(partition, removed));
});
}
Long offset = consumer.position(tp);
log.info("{} - adding partition: {}, offset={}", id, partition, offset);
offsets.put(partition, offset);
- seen.put(partition, new HashMap<>());
+ seen.put(
+ partition,
+ repository
+ .findById(Integer.toString(tp.partition()))
+ .map(document -> document.statistics)
+ .orElse(new HashMap<>()));
});
}