1 package de.juplo.kafka.chat.backend.implementation.kafka;
3 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
5 import lombok.RequiredArgsConstructor;
6 import lombok.extern.slf4j.Slf4j;
7 import org.apache.kafka.clients.consumer.Consumer;
8 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
10 import java.util.concurrent.CompletableFuture;
13 @RequiredArgsConstructor
15 public class ChannelTaskExecutor
17 private final ThreadPoolTaskExecutor taskExecutor;
19 private final Channel channel;
20 private final Consumer<String, AbstractMessageTo> consumer;
21 private final WorkAssignor workAssignor;
23 CompletableFuture<Void> channelTaskJob;
26 public void executeChannelTask()
28 workAssignor.assignWork(consumer);
29 log.info("Starting the consumer-task for {}", channel);
30 channelTaskJob = taskExecutor
31 .submitCompletable(channel)
34 log.error("The consumer-task for {} exited abnormally!", channel, e);
41 log.info("Signaling the consumer-task for {} to quit its work", channel);
43 log.info("Waiting for the consumer of {} to finish its work", channel);
44 channelTaskJob.join();
45 log.info("Joined the consumer-task for {}", channel);