{
private final ConsumerTaskExecutor infoChannelConsumerTaskExecutor;
private final ConsumerTaskExecutor dataChannelConsumerTaskExecutor;
+ private final InfoChannel infoChannel;
public void executeConsumerTasks()
{
dataChannelConsumerTaskExecutor.executeConsumerTask();
}
- public void joinConsumerTasks()
+ public void joinConsumerTasks() throws InterruptedException
{
dataChannelConsumerTaskExecutor.joinConsumerTaskJob();
+ while (infoChannel.loadInProgress())
+ {
+ log.info("Waiting for {} to finish loading...", infoChannel);
+ Thread.sleep(1000);
+ }
infoChannelConsumerTaskExecutor.joinConsumerTaskJob();
}
}
}
@PreDestroy
- public void joinConsumerTasks()
+ public void joinConsumerTasks() throws InterruptedException
{
consumerTaskRunner.joinConsumerTasks();
}
@Bean
ConsumerTaskRunner consumerTaskRunner(
ConsumerTaskExecutor infoChannelConsumerTaskExecutor,
- ConsumerTaskExecutor dataChannelConsumerTaskExecutor)
+ ConsumerTaskExecutor dataChannelConsumerTaskExecutor,
+ InfoChannel infoChannel)
{
return new ConsumerTaskRunner(
infoChannelConsumerTaskExecutor,
- dataChannelConsumerTaskExecutor);
+ dataChannelConsumerTaskExecutor,
+ infoChannel);
}
@Bean
}
@AfterAll
- static void joinConsumerTasks(@Autowired ConsumerTaskRunner consumerTaskRunner)
+ static void joinConsumerTasks(
+ @Autowired ConsumerTaskRunner consumerTaskRunner)
+ throws InterruptedException
{
KafkaTestUtils.joinConsumerTasks(consumerTaskRunner);
}
}
@AfterAll
- static void joinConsumerTasks(@Autowired ConsumerTaskRunner consumerTaskRunner)
+ static void joinConsumerTasks(
+ @Autowired ConsumerTaskRunner consumerTaskRunner)
+ throws InterruptedException
{
KafkaTestUtils.joinConsumerTasks(consumerTaskRunner);
}
new TopicPartition(result.getRecordMetadata().topic(), result.getRecordMetadata().partition()));
}
- public static void joinConsumerTasks(ConsumerTaskRunner consumerTaskRunner)
+ public static void joinConsumerTasks(ConsumerTaskRunner consumerTaskRunner) throws InterruptedException
{
consumerTaskRunner.joinConsumerTasks();
}