--- /dev/null
+package de.juplo.kafka.chat.backend.implementation.kafka;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+
+@RequiredArgsConstructor
+@Slf4j
+public class ConsumerTaskRunner
+{
+ private final ConsumerTaskExecutor infoChannelConsumerTaskExecutor;
+ private final ConsumerTaskExecutor dataChannelConsumerTaskExecutor;
+
+ private final InfoChannel infoChannel;
+
+ void run() throws InterruptedException
+ {
+ infoChannelConsumerTaskExecutor.executeConsumerTask();
+
+ while (infoChannel.loadInProgress())
+ {
+ log.info("InfoChannel is still loading...");
+ Thread.sleep(1000);
+ }
+
+ dataChannelConsumerTaskExecutor.executeConsumerTask();
+ }
+
+ void joinTasks()
+ {
+ dataChannelConsumerTaskExecutor.joinConsumerTaskJob();
+ infoChannelConsumerTaskExecutor.joinConsumerTaskJob();
+ }
+}
package de.juplo.kafka.chat.backend.implementation.kafka;
+import jakarta.annotation.PreDestroy;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.common.TopicPartition;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
@Slf4j
public class KafkaServicesApplicationRunner implements ApplicationRunner
{
- private final String infoTopic;
- private final ThreadPoolTaskExecutor taskExecutor;
- private final InfoChannel infoChannel;
- private final DataChannel dataChannel;
- private final Consumer<String, AbstractMessageTo> infoChannelConsumer;
- private final Consumer<String, AbstractMessageTo> dataChannelConsumer;
- private final WorkAssignor workAssignor;
-
- CompletableFuture<Void> infoChannelConsumerJob;
- CompletableFuture<Void> dataChannelConsumerJob;
->>>>>>> 7fb62d3 (WIP:ALIGN)
+ private final ConsumerTaskRunner consumerTaskRunner;
@Override
public void run(ApplicationArguments args) throws Exception
{
-<<<<<<< HEAD
- chatRoomChannelTaskExecutor.executeConsumerTask();
-=======
- List<TopicPartition> partitions = infoChannelConsumer
- .partitionsFor(infoTopic)
- .stream()
- .map(partitionInfo -> new TopicPartition(
- infoTopic,
- partitionInfo.partition()))
- .toList();
- infoChannelConsumer.assign(partitions);
- log.info("Starting the consumer for the InfoChannel");
- infoChannelConsumerJob = taskExecutor
- .submitCompletable(infoChannel)
- .exceptionally(e ->
- {
- log.error("The consumer for the InfoChannel exited abnormally!", e);
- return null;
- });
-
- while (infoChannel.loadInProgress())
- {
- log.info("InfoChannel is still loading...");
- Thread.sleep(1000);
- }
-
- workAssignor.assignWork(dataChannelConsumer);
- log.info("Starting the consumer for the DataChannel");
- dataChannelConsumerJob = taskExecutor
- .submitCompletable(dataChannel)
- .exceptionally(e ->
- {
- log.error("The consumer for the DataChannel exited abnormally!", e);
- return null;
- });
+ consumerTaskRunner.run();
}
@PreDestroy
- public void joinChatRoomChannelConsumerJob()
- {
- log.info("Signaling the consumer of the CahtRoomChannel to quit its work");
- infoChannelConsumer.wakeup();
- log.info("Waiting for the consumer of the ChatRoomChannel to finish its work");
- dataChannelConsumerJob.join();
- log.info("Joined the consumer of the ChatRoomChannel");
- }
-
-
- interface WorkAssignor
+ public void joinConsumerTasks()
{
- void assignWork(Consumer<?, ?> consumer);
+ consumerTaskRunner.joinTasks();
}
}
public class KafkaServicesConfiguration
{
@Bean
- ConsumerTaskExecutor infoChannelTaskExecutor(
+ ConsumerTaskRunner consumerTaskRunner(
+ ConsumerTaskExecutor infoChannelConsumerTaskExecutor,
+ ConsumerTaskExecutor dataChannelConsumerTaskExecutor,
+ InfoChannel infoChannel)
+ {
+ return new ConsumerTaskRunner(
+ infoChannelConsumerTaskExecutor,
+ dataChannelConsumerTaskExecutor,
+ infoChannel);
+ }
+
+ @Bean
+ ConsumerTaskExecutor infoChannelConsumerTaskExecutor(
ThreadPoolTaskExecutor taskExecutor,
InfoChannel infoChannel,
Consumer<String, AbstractMessageTo> dataChannelConsumer,
}
@Bean
- ConsumerTaskExecutor dataChannelTaskExecutor(
+ ConsumerTaskExecutor dataChannelConsumerTaskExecutor(
ThreadPoolTaskExecutor taskExecutor,
DataChannel dataChannel,
Consumer<String, AbstractMessageTo> dataChannelConsumer,