package de.juplo.kafka;
-import jakarta.annotation.PreDestroy;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.ApplicationArguments;
-import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.context.ConfigurableApplicationContext;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
-import org.springframework.util.concurrent.ListenableFuture;
-
-import java.util.concurrent.ExecutionException;
@SpringBootApplication
-@Slf4j
-public class Application implements ApplicationRunner
+public class Application
{
- @Autowired
- ThreadPoolTaskExecutor taskExecutor;
- @Autowired
- Consumer<?, ?> kafkaConsumer;
- @Autowired
- ExampleConsumer exampleConsumer;
- @Autowired
- ConfigurableApplicationContext context;
-
- ListenableFuture<Integer> consumerJob;
-
- @Override
- public void run(ApplicationArguments args) throws Exception
- {
- log.info("Starting ExampleConsumer");
- consumerJob = taskExecutor.submitListenable(exampleConsumer);
- consumerJob.addCallback(
- exitStatus ->
- {
- log.info("ExampleConsumer exited normally, exit-status: {}", exitStatus);
- SpringApplication.exit(context, () -> exitStatus);
- },
- t ->
- {
- log.error("ExampleConsumer exited abnormally!", t);
- SpringApplication.exit(context, () -> 2);
- });
- }
-
- @PreDestroy
- public void shutdown() throws ExecutionException, InterruptedException
- {
- log.info("Signaling ExampleConsumer to quit its work");
- kafkaConsumer.wakeup();
- log.info("Waiting for ExampleConsumer to finish its work");
- consumerJob.get();
- log.info("ExampleConsumer finished its work");
- }
-
-
public static void main(String[] args)
{
SpringApplication.run(Application.class, args);
{
return
new ExampleConsumer(
+ kafkaConsumer,
kafkaProperties.getClientId(),
- applicationProperties.getTopics(),
- kafkaConsumer);
+ applicationProperties.getTopics());
}
@Bean
package de.juplo.kafka;
-import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.time.Duration;
import java.util.Arrays;
-import java.util.concurrent.Callable;
@Slf4j
-@RequiredArgsConstructor
-public class ExampleConsumer implements Callable<Integer>
+public class ExampleConsumer implements Runnable
{
private final String id;
private final String[] topics;
private final Consumer<String, String> consumer;
+ private final Thread workerThread;
+ private volatile boolean running = false;
private long consumed = 0;
+ public ExampleConsumer(
+ Consumer<String, String> consumer,
+ String clientId,
+ String... topics)
+ {
+ this.id = clientId;
+ this.topics = topics;
+ this.consumer = consumer;
+
+ workerThread = new Thread(this, "ExampleConsumer Worker-Thread");
+ workerThread.start();
+ }
+
+
@Override
- public Integer call()
+ public void run()
{
try
{
log.info("{} - Subscribing to topics: {}", id, topics);
consumer.subscribe(Arrays.asList(topics));
+ running = true;
- while (true)
+ while (running)
{
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(1));
catch(WakeupException e)
{
log.info("{} - Consumer was signaled to finish its work", id);
- return 0;
}
catch(Exception e)
{
log.error("{} - Unexpected error: {}, unsubscribing!", id, e.toString());
consumer.unsubscribe();
- return 1;
}
finally
{
- log.info("{} - Closing the KafkaConsumer", id);
- consumer.close();
log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
}
}
consumed++;
log.info("{} - {}: {}/{} - {}={}", id, offset, topic, partition, key, value);
}
+
+
+ public void shutdown() throws InterruptedException
+ {
+ log.info("{} joining the worker-thread...", id);
+ running = false;
+ consumer.wakeup();
+ workerThread.join();
+ }
}