@RequiredArgsConstructor
@Slf4j
-public class ConsumerTaskRunner
+public class ChannelTaskRunner
{
- private final ConsumerTaskExecutor infoChannelConsumerTaskExecutor;
- private final ConsumerTaskExecutor dataChannelConsumerTaskExecutor;
- private final InfoChannel infoChannel;
+ private final ChannelTaskExecutor infoChannelTaskExecutor;
+ private final ChannelTaskExecutor dataChannelTaskExecutor;
- public void executeConsumerTasks()
+ public void executeChannels()
{
- infoChannelConsumerTaskExecutor.executeConsumerTask();
- dataChannelConsumerTaskExecutor.executeConsumerTask();
+ infoChannelTaskExecutor.executeChannelTask();
+ dataChannelTaskExecutor.executeChannelTask();
}
- public void joinConsumerTasks() throws InterruptedException
+ public void joinChannels() throws InterruptedException
{
- dataChannelConsumerTaskExecutor.joinConsumerTaskJob();
- while (infoChannel.isLoadInProgress())
+ joinChannel(dataChannelTaskExecutor);
+ joinChannel(infoChannelTaskExecutor);
+ }
+
+ private void joinChannel(
+ ChannelTaskExecutor channelTaskExecutor)
+ throws InterruptedException
+ {
+ Channel channel = channelTaskExecutor.getChannel();
+ while (channel.getChannelState() != ChannelState.SHUTTING_DOWN)
{
- log.info("Waiting for {} to finish loading...", infoChannel);
+ log.info("Waiting for {} to shut down...", channel);
Thread.sleep(1000);
}
- infoChannelConsumerTaskExecutor.joinConsumerTaskJob();
+ channelTaskExecutor.join();
}
}