package de.juplo.kafka.chat.backend.implementation.kafka;
import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
-import jakarta.annotation.PreDestroy;
+import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
@RequiredArgsConstructor
@Slf4j
-public class ConsumerTaskExecutor
+public class ChannelTaskExecutor
{
private final ThreadPoolTaskExecutor taskExecutor;
- private final Runnable consumerTask;
+ @Getter
+ private final Channel channel;
private final Consumer<String, AbstractMessageTo> consumer;
private final WorkAssignor workAssignor;
- CompletableFuture<Void> consumerTaskJob;
+ CompletableFuture<Void> channelTaskJob;
- public void executeConsumerTask()
+ public void executeChannelTask()
{
workAssignor.assignWork(consumer);
- log.info("Starting the consumer-task for {}", consumerTask);
- consumerTaskJob = taskExecutor
- .submitCompletable(consumerTask)
+ log.info("Starting the consumer-task for {}", channel);
+ channelTaskJob = taskExecutor
+ .submitCompletable(channel)
.exceptionally(e ->
{
- log.error("The consumer-task for {} exited abnormally!", consumerTask, e);
+ log.error("The consumer-task for {} exited abnormally!", channel, e);
return null;
});
}
- @PreDestroy
- public void joinConsumerTaskJob()
+ public void join()
{
- log.info("Signaling the consumer-task for {} to quit its work", consumerTask);
+ log.info("Signaling the consumer-task for {} to quit its work", channel);
consumer.wakeup();
- log.info("Waiting for the consumer of {} to finish its work", consumerTask);
- consumerTaskJob.join();
- log.info("Joined the consumer-task for {}", consumerTask);
+ log.info("Waiting for the consumer of {} to finish its work", channel);
+ channelTaskJob.join();
+ log.info("Joined the consumer-task for {}", channel);
}
}