- log.info("{} - Commiting offsets for all previously assigned partitions", id);
- CountDownLatch commitDone = new CountDownLatch(1);
- consumer.commitAsync((offsets, e) ->
- {
- commitDone.countDown();
- if (e == null)
- {
- log.error("{} - Could not commit offsets to Kafka!", id, e);
- }
- else
- {
- offsets.entrySet().stream().forEach(entry ->
- {
- log.info("{} - Commited offset for {}: {}", id, entry.getKey(), entry.getValue());
- });
- }
- });
-