import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.stereotype.Component;
-import javax.annotation.PreDestroy;
-import java.util.List;
+import java.util.Optional;
import java.util.function.Consumer;
String id;
@Autowired
Consumer<ConsumerRecord<K, V>> handler;
+ @Autowired
+ ApplicationErrorHandler errorHandler;
private long consumed = 0;
id = "${consumer.client-id}",
idIsGroup = false,
topics = "${consumer.topic}",
- containerFactory = "batchFactory",
autoStartup = "false")
- public void receive(List<ConsumerRecord<K, V>> records)
+ public void receive(ConsumerRecord<K, V> record)
{
- // Do something with the data...
- log.info("{} - Received {} messages", id, records.size());
- for (ConsumerRecord<K, V> record : records)
- {
- log.info(
- "{} - {}: {}/{} - {}={}",
- id,
- record.offset(),
- record.topic(),
- record.partition(),
- record.key(),
- record.value()
- );
+ log.info(
+ "{} - {}: {}/{} - {}={}",
+ id,
+ record.offset(),
+ record.topic(),
+ record.partition(),
+ record.key(),
+ record.value()
+ );
- handler.accept(record);
+ handler.accept(record);
- consumed++;
- }
+ consumed++;
}
public synchronized void start()
{
+ if (registry.getListenerContainer(id).isChildRunning())
+ throw new IllegalStateException("Consumer instance " + id + " is already running!");
+
log.info("{} - Starting - consumed {} messages before", id, consumed);
+ errorHandler.clearException();
registry.getListenerContainer(id).start();
}
public synchronized void stop()
{
+ if (!registry.getListenerContainer(id).isChildRunning())
+ throw new IllegalStateException("Consumer instance " + id + " is not running!");
+
log.info("{} - Stopping", id);
registry.getListenerContainer(id).stop();
log.info("{} - Stopped - consumed {} messages so far", id, consumed);
}
- @PreDestroy
- public void destroy()
+ public synchronized Optional<Exception> exitStatus()
{
- log.info("{} - Destroy!", id);
- stop();
+ if (registry.getListenerContainer(id).isChildRunning())
+ throw new IllegalStateException("No exit-status available: Consumer instance " + id + " is running!");
+
+ return errorHandler.getException();
}
}