import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.stereotype.Component;
+import javax.annotation.PreDestroy;
+import java.util.List;
import java.util.function.Consumer;
@RequiredArgsConstructor
public class EndlessConsumer<K, V>
{
+ @Autowired
+ private KafkaListenerEndpointRegistry registry;
@Value("${consumer.client-id}")
String id;
@Autowired
Consumer<ConsumerRecord<K, V>> handler;
+ private long consumed = 0;
+
+ @KafkaListener(
+ id = "${consumer.client-id}",
+ idIsGroup = false,
+ topics = "${consumer.topic}",
+ containerFactory = "batchFactory",
+ autoStartup = "false")
+ public void receive(List<ConsumerRecord<K, V>> records)
+ {
+ // Do something with the data...
+ log.info("{} - Received {} messages", id, records.size());
+ for (ConsumerRecord<K, V> record : records)
+ {
+ log.info(
+ "{} - {}: {}/{} - {}={}",
+ id,
+ record.offset(),
+ record.topic(),
+ record.partition(),
+ record.key(),
+ record.value()
+ );
+
+ handler.accept(record);
+
+ consumed++;
+ }
+ }
+
+
+ public synchronized void start()
+ {
+ if (registry.getListenerContainer(id).isChildRunning())
+ throw new IllegalStateException("Consumer instance " + id + " is already running!");
+
+ log.info("{} - Starting - consumed {} messages before", id, consumed);
+ registry.getListenerContainer(id).start();
+ }
+
+ public synchronized void stop()
+ {
+ if (!registry.getListenerContainer(id).isChildRunning())
+ throw new IllegalStateException("Consumer instance " + id + " is not running!");
+
+ log.info("{} - Stopping", id);
+ registry.getListenerContainer(id).stop();
+ log.info("{} - Stopped - consumed {} messages so far", id, consumed);
+ }
- @KafkaListener(topics = "${consumer.topic}")
- public void receive(ConsumerRecord<K, V> record)
+ @PreDestroy
+ public void destroy()
{
- log.info(
- "{} - {}: {}/{} - {}={}",
- id,
- record.offset(),
- record.topic(),
- record.partition(),
- record.key(),
- record.value()
- );
-
- handler.accept(record);
+ log.info("{} - Destroy!", id);
+ try
+ {
+ stop();
+ }
+ catch (IllegalStateException e)
+ {
+ log.info("{} - Was already stopped", id);
+ }
+ catch (Exception e)
+ {
+ log.error("{} - Unexpected exception while trying to stop the consumer", id, e);
+ }
+ finally
+ {
+ log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
+ }
}
}