--- /dev/null
+package de.juplo.kafka.chat.backend.implementation.kafka;
+
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
+import jakarta.annotation.PreDestroy;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+
+import java.util.concurrent.CompletableFuture;
+
+
+@RequiredArgsConstructor
+@Slf4j
+public class ConsumerTaskExecutor
+{
+ private final ThreadPoolTaskExecutor taskExecutor;
+ private final Runnable consumerTask;
+ private final Consumer<String, AbstractMessageTo> consumer;
+ private final WorkAssignor workAssignor;
+
+ CompletableFuture<Void> consumerTaskJob;
+
+
+ public void executeConsumerTask()
+ {
+ workAssignor.assignWork(consumer);
+ log.info("Starting the consumer-task for {}", consumerTask);
+ consumerTaskJob = taskExecutor
+ .submitCompletable(consumerTask)
+ .exceptionally(e ->
+ {
+ log.error("The consumer-task for {} exited abnormally!", consumerTask, e);
+ return null;
+ });
+ }
+
+ @PreDestroy
+ public void joinConsumerTaskJob()
+ {
+ log.info("Signaling the consumer-task for {} to quit its work", consumerTask);
+ consumer.wakeup();
+ log.info("Waiting for the consumer of {} to finish its work", consumerTask);
+ consumerTaskJob.join();
+ log.info("Joined the consumer-task for {}", consumerTask);
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.implementation.kafka;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+
+@RequiredArgsConstructor
+@Slf4j
+public class ConsumerTaskRunner
+{
+ private final ConsumerTaskExecutor infoChannelConsumerTaskExecutor;
+ private final ConsumerTaskExecutor dataChannelConsumerTaskExecutor;
+ private final InfoChannel infoChannel;
+
+ public void executeConsumerTasks()
+ {
+ infoChannelConsumerTaskExecutor.executeConsumerTask();
+ dataChannelConsumerTaskExecutor.executeConsumerTask();
+ }
+
+ public void joinConsumerTasks() throws InterruptedException
+ {
+ dataChannelConsumerTaskExecutor.joinConsumerTaskJob();
+ while (infoChannel.getChannelState() != ChannelState.SHUTTING_DOWN)
+ {
+ log.info("Waiting for {} to shut down...", infoChannel);
+ Thread.sleep(1000);
+ }
+ infoChannelConsumerTaskExecutor.joinConsumerTaskJob();
+ }
+}
+++ /dev/null
-package de.juplo.kafka.chat.backend.implementation.kafka;
-
-import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
-import jakarta.annotation.PreDestroy;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
-
-import java.util.concurrent.CompletableFuture;
-
-
-@RequiredArgsConstructor
-@Slf4j
-public class ConsumerTaskExecutor
-{
- private final ThreadPoolTaskExecutor taskExecutor;
- private final Runnable consumerTask;
- private final Consumer<String, AbstractMessageTo> consumer;
- private final WorkAssignor workAssignor;
-
- CompletableFuture<Void> consumerTaskJob;
-
-
- public void executeConsumerTask()
- {
- workAssignor.assignWork(consumer);
- log.info("Starting the consumer-task for {}", consumerTask);
- consumerTaskJob = taskExecutor
- .submitCompletable(consumerTask)
- .exceptionally(e ->
- {
- log.error("The consumer-task for {} exited abnormally!", consumerTask, e);
- return null;
- });
- }
-
- @PreDestroy
- public void joinConsumerTaskJob()
- {
- log.info("Signaling the consumer-task for {} to quit its work", consumerTask);
- consumer.wakeup();
- log.info("Waiting for the consumer of {} to finish its work", consumerTask);
- consumerTaskJob.join();
- log.info("Joined the consumer-task for {}", consumerTask);
- }
-}
+++ /dev/null
-package de.juplo.kafka.chat.backend.implementation.kafka;
-
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-
-
-@RequiredArgsConstructor
-@Slf4j
-public class ConsumerTaskRunner
-{
- private final ConsumerTaskExecutor infoChannelConsumerTaskExecutor;
- private final ConsumerTaskExecutor dataChannelConsumerTaskExecutor;
- private final InfoChannel infoChannel;
-
- public void executeConsumerTasks()
- {
- infoChannelConsumerTaskExecutor.executeConsumerTask();
- dataChannelConsumerTaskExecutor.executeConsumerTask();
- }
-
- public void joinConsumerTasks() throws InterruptedException
- {
- dataChannelConsumerTaskExecutor.joinConsumerTaskJob();
- while (infoChannel.getChannelState() != ChannelState.SHUTTING_DOWN)
- {
- log.info("Waiting for {} to shut down...", infoChannel);
- Thread.sleep(1000);
- }
- infoChannelConsumerTaskExecutor.joinConsumerTaskJob();
- }
-}