private String id;
private long consumed = 0;
- @KafkaListener(topics = "${juplo.consumer.topic}")
+ @KafkaListener(topics = "${juplo.consumer.topic}", concurrency = "2")
private void receive(
@Header(KafkaHeaders.RECEIVED_TOPIC)
String topic,
String key,
String value)
{
+ // BEACHTE:
+ // Der Zugriff auf die Variable "consumed" ist hier so nicht mehr korrekt!
+ // Grund: Der Zugriff erfolgt ohne Locking konkurrierend aus zwei Threads.
consumed++;
log.info("{} - partition={}-{}, offset={}: {}={}", id, topic, partition, offset, key, value);
}