+++ /dev/null
-package de.juplo.kafka.chat.backend.implementation.kafka;
-
-import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
-import jakarta.annotation.PreDestroy;
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
-
-import java.util.concurrent.CompletableFuture;
-
-
-@RequiredArgsConstructor
-@Slf4j
-public class ChannelExecutor
-{
- private final ThreadPoolTaskExecutor taskExecutor;
- @Getter
- private final Channel channel;
- private final Consumer<String, AbstractMessageTo> consumer;
- private final WorkAssignor workAssignor;
-
- CompletableFuture<Void> consumerTaskJob;
-
-
- public void executeConsumerTask()
- {
- workAssignor.assignWork(consumer);
- log.info("Starting the consumer-task for {}", channel);
- consumerTaskJob = taskExecutor
- .submitCompletable(channel)
- .exceptionally(e ->
- {
- log.error("The consumer-task for {} exited abnormally!", channel, e);
- return null;
- });
- }
-
- @PreDestroy
- public void joinConsumerTaskJob()
- {
- log.info("Signaling the consumer-task for {} to quit its work", channel);
- consumer.wakeup();
- log.info("Waiting for the consumer of {} to finish its work", channel);
- consumerTaskJob.join();
- log.info("Joined the consumer-task for {}", channel);
- }
-}
+++ /dev/null
-package de.juplo.kafka.chat.backend.implementation.kafka;
-
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-
-
-@RequiredArgsConstructor
-@Slf4j
-public class ChannelRunner
-{
- private final ChannelExecutor infoChannelExecutor;
- private final ChannelExecutor dataChannelExecutor;
-
- public void executeChannel()
- {
- infoChannelExecutor.executeConsumerTask();
- dataChannelExecutor.executeConsumerTask();
- }
-
- public void joinChannel() throws InterruptedException
- {
- dataChannelExecutor.joinConsumerTaskJob();
- while (infoChannel.getChannelState() != ChannelState.SHUTTING_DOWN)
- {
- log.info("Waiting for {} to shut down...", infoChannel);
- Thread.sleep(1000);
- }
- infoChannelExecutor.joinConsumerTaskJob();
- }
-
- private void joinChannel()
-}
--- /dev/null
+package de.juplo.kafka.chat.backend.implementation.kafka;
+
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
+import jakarta.annotation.PreDestroy;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+
+import java.util.concurrent.CompletableFuture;
+
+
+@RequiredArgsConstructor
+@Slf4j
+public class ChannelExecutor
+{
+ private final ThreadPoolTaskExecutor taskExecutor;
+ @Getter
+ private final Channel channel;
+ private final Consumer<String, AbstractMessageTo> consumer;
+ private final WorkAssignor workAssignor;
+
+ CompletableFuture<Void> consumerTaskJob;
+
+
+ public void executeConsumerTask()
+ {
+ workAssignor.assignWork(consumer);
+ log.info("Starting the consumer-task for {}", channel);
+ consumerTaskJob = taskExecutor
+ .submitCompletable(channel)
+ .exceptionally(e ->
+ {
+ log.error("The consumer-task for {} exited abnormally!", channel, e);
+ return null;
+ });
+ }
+
+ @PreDestroy
+ public void joinConsumerTaskJob()
+ {
+ log.info("Signaling the consumer-task for {} to quit its work", channel);
+ consumer.wakeup();
+ log.info("Waiting for the consumer of {} to finish its work", channel);
+ consumerTaskJob.join();
+ log.info("Joined the consumer-task for {}", channel);
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.implementation.kafka;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+
+@RequiredArgsConstructor
+@Slf4j
+public class ChannelRunner
+{
+ private final ChannelExecutor infoChannelExecutor;
+ private final ChannelExecutor dataChannelExecutor;
+
+ public void executeChannel()
+ {
+ infoChannelExecutor.executeConsumerTask();
+ dataChannelExecutor.executeConsumerTask();
+ }
+
+ public void joinChannel() throws InterruptedException
+ {
+ dataChannelExecutor.joinConsumerTaskJob();
+ while (infoChannel.getChannelState() != ChannelState.SHUTTING_DOWN)
+ {
+ log.info("Waiting for {} to shut down...", infoChannel);
+ Thread.sleep(1000);
+ }
+ infoChannelExecutor.joinConsumerTaskJob();
+ }
+
+ private void joinChannel()
+}