* Ziel, näher an die Funktion des Vanilla-EndlessConsumer heranrücken.
Der Testfall soll möglichst unverändert funktionieren.
* Unklar, ob das für die Schulung hilfreich ist.
* Hilft aber definitiv beim Verstehen der Mechanismen von Spring Kafka
* Hier wurde jetzt erst mal der automatische Start des Containers
unterbunden und stattdessen `Application` wieder zu einem
`ApplicationRunner` mit `@PreDestroy`-Methode gemacht.
* TODO: Testfall ist darüber erst mal ROT, da der Container dort jetzt
nicht gestartet wird! Das war ja aber genau das Ziel, denn bei den
übernommenen Testfällen gab es (zumindest theoretisch) Race-Conditions,
weil der Container - anders als bei der Vanilla-Implementierung - immer
schon lief.
package de.juplo.kafka;
import lombok.extern.slf4j.Slf4j;
package de.juplo.kafka;
import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.ApplicationArguments;
+import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
+
+import javax.annotation.PreDestroy;
@SpringBootApplication
@Slf4j
@SpringBootApplication
@Slf4j
-public class Application
+public class Application implements ApplicationRunner
+ @Autowired
+ KafkaListenerEndpointRegistry registry;
+ @Value("${consumer.client-id}")
+ String clientId;
+
+
+ @Override
+ public void run(ApplicationArguments args) throws Exception
+ {
+ log.info("Starting EndlessConsumer");
+ this.registry.getListenerContainer(clientId).start();
+ }
+
+ @PreDestroy
+ public void stopExecutor()
+ {
+ log.info("Stopping EndlessConsumer");
+ this.registry.getListenerContainer(clientId).stop();
+ }
+
+
public static void main(String[] args)
{
SpringApplication.run(Application.class, args);
public static void main(String[] args)
{
SpringApplication.run(Application.class, args);
Consumer<ConsumerRecord<K, V>> handler;
Consumer<ConsumerRecord<K, V>> handler;
- @KafkaListener(topics = "${consumer.topic}", containerFactory = "batchFactory")
+ @KafkaListener(
+ id = "${consumer.client-id}",
+ idIsGroup = false,
+ topics = "${consumer.topic}",
+ containerFactory = "batchFactory",
+ autoStartup = "false")
public void receive(List<ConsumerRecord<K, V>> records)
{
// Do something with the data...
public void receive(List<ConsumerRecord<K, V>> records)
{
// Do something with the data...