- private final Consumer<String, String> consumer;
+ private final Consumer<K, V> consumer;
+ private final java.util.function.Consumer<ConsumerRecord<K, V>> handler;
private final Lock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
private final Lock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
consumer.poll(Duration.ofSeconds(1));
// Do something with the data...
log.info("{} - Received {} messages", id, records.count());
consumer.poll(Duration.ofSeconds(1));
// Do something with the data...
log.info("{} - Received {} messages", id, records.count());
Map<String, Long> byKey = seen.get(partition);
if (!byKey.containsKey(key))
Map<String, Long> byKey = seen.get(partition);
if (!byKey.containsKey(key))