import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
+import java.util.List;
import java.util.function.Consumer;
Consumer<ConsumerRecord<K, V>> handler;
- @KafkaListener(topics = "${consumer.topic}")
- public void receive(ConsumerRecord<K, V> record)
+ @KafkaListener(topics = "${consumer.topic}", containerFactory = "batchFactory")
+ public void receive(List<ConsumerRecord<K, V>> records)
{
- log.info(
- "{} - {}: {}/{} - {}={}",
- id,
- record.offset(),
- record.topic(),
- record.partition(),
- record.key(),
- record.value()
- );
+ // Do something with the data...
+ log.info("{} - Received {} messages", id, records.size());
+ for (ConsumerRecord<K, V> record : records)
+ {
+ log.info(
+ "{} - {}: {}/{} - {}={}",
+ id,
+ record.offset(),
+ record.topic(),
+ record.partition(),
+ record.key(),
+ record.value()
+ );
- handler.accept(record);
+ handler.accept(record);
+ }
}
}