+ log.info("{} - Commiting offsets for all previously assigned partitions", id);
+ CountDownLatch commitDone = new CountDownLatch(1);
+ consumer.commitAsync((offsets, e) ->
+ {
+ commitDone.countDown();
+ if (e == null)
+ {
+ log.error("{} - Could not commit offsets to Kafka!", id, e);
+ }
+ else
+ {
+ offsets.entrySet().stream().forEach(entry ->
+ {
+ log.info("{} - Commited offset for {}: {}", id, entry.getKey(), entry.getValue());
+ });
+ }
+ });
+