+ private long consumed = 0;
+
+ @KafkaListener(
+ id = "${consumer.client-id}",
+ idIsGroup = false,
+ topics = "${consumer.topic}",
+ containerFactory = "batchFactory",
+ autoStartup = "false")
+ public void receive(List<ConsumerRecord<K, V>> records)
+ {
+ // Do something with the data...
+ log.info("{} - Received {} messages", id, records.size());
+ for (ConsumerRecord<K, V> record : records)
+ {
+ log.info(
+ "{} - {}: {}/{} - {}={}",
+ id,
+ record.offset(),
+ record.topic(),
+ record.partition(),
+ record.key(),
+ record.value()
+ );
+
+ handler.accept(record);
+
+ consumed++;
+ }
+ }
+
+
+ public synchronized void start()
+ {
+ if (registry.getListenerContainer(id).isChildRunning())
+ throw new IllegalStateException("Consumer instance " + id + " is already running!");
+
+ log.info("{} - Starting - consumed {} messages before", id, consumed);
+ registry.getListenerContainer(id).start();
+ }