1 package de.juplo.kafka.chat.backend.implementation.kafka;
3 import lombok.RequiredArgsConstructor;
4 import lombok.extern.slf4j.Slf4j;
7 @RequiredArgsConstructor
9 public class ChannelTaskRunner
11 private final ChannelTaskExecutor infoChannelTaskExecutor;
12 private final ChannelTaskExecutor dataChannelTaskExecutor;
14 public void executeChannels()
16 infoChannelTaskExecutor.executeChannelTask();
17 dataChannelTaskExecutor.executeChannelTask();
20 public void joinChannels() throws InterruptedException
22 joinChannel(dataChannelTaskExecutor);
23 joinChannel(infoChannelTaskExecutor);
26 private void joinChannel(
27 ChannelTaskExecutor channelTaskExecutor)
28 throws InterruptedException
30 Channel channel = channelTaskExecutor.getChannel();
31 while (channel.getChannelState() != ChannelState.SHUTTING_DOWN)
33 log.info("Waiting for {} to shut down...", channel);
36 channelTaskExecutor.join();