- private KafkaConsumer<String, String> consumer;
- private Lock lock = new ReentrantLock();
- private Condition stopped = lock.newCondition();
+
+ @KafkaListener(
+ id = "${consumer.client-id}",
+ idIsGroup = false,
+ topics = "${consumer.topic}",
+ containerFactory = "batchFactory",
+ autoStartup = "false")
+ public void receive(List<ConsumerRecord<K, V>> records)
+ {
+ // Do something with the data...
+ log.info("{} - Received {} messages", id, records.size());
+ for (ConsumerRecord<K, V> record : records)
+ {
+ log.info(
+ "{} - {}: {}/{} - {}={}",
+ id,
+ record.offset(),
+ record.topic(),
+ record.partition(),
+ record.key(),
+ record.value()
+ );
+
+ handler.accept(record);
+
+ consumed++;
+ }
+ }