@RequiredArgsConstructor
@Slf4j
-public class ChannelExecutor
+public class ChannelTaskExecutor
{
private final ThreadPoolTaskExecutor taskExecutor;
@Getter
@RequiredArgsConstructor
@Slf4j
-public class ChannelRunner
+public class ChannelTaskRunner
{
- private final ChannelExecutor infoChannelExecutor;
- private final ChannelExecutor dataChannelExecutor;
+ private final ChannelTaskExecutor infoChannelTaskExecutor;
+ private final ChannelTaskExecutor dataChannelTaskExecutor;
public void executeChannel()
{
- infoChannelExecutor.executeConsumerTask();
- dataChannelExecutor.executeConsumerTask();
+ infoChannelTaskExecutor.executeConsumerTask();
+ dataChannelTaskExecutor.executeConsumerTask();
}
public void joinChannel() throws InterruptedException
{
- dataChannelExecutor.joinConsumerTaskJob();
+ dataChannelTaskExecutor.joinConsumerTaskJob();
while (infoChannel.getChannelState() != ChannelState.SHUTTING_DOWN)
{
log.info("Waiting for {} to shut down...", infoChannel);
Thread.sleep(1000);
}
- infoChannelExecutor.joinConsumerTaskJob();
+ infoChannelTaskExecutor.joinConsumerTaskJob();
}
private void joinChannel()
@RequiredArgsConstructor
public class KafkaServicesApplicationRunner implements ApplicationRunner
{
- private final ChannelRunner channelRunner;
+ private final ChannelTaskRunner channelTaskRunner;
@Override
public void run(ApplicationArguments args)
{
- channelRunner.executeChannel();
+ channelTaskRunner.executeChannel();
}
@PreDestroy
public void joinConsumerTasks() throws InterruptedException
{
- channelRunner.joinChannel();
+ channelTaskRunner.joinChannel();
}
}
public class KafkaServicesConfiguration
{
@Bean
- ChannelRunner consumerTaskRunner(
- ChannelExecutor infoChannelChannelExecutor,
- ChannelExecutor dataChannelChannelExecutor,
+ ChannelTaskRunner consumerTaskRunner(
+ ChannelTaskExecutor infoChannelChannelTaskExecutor,
+ ChannelTaskExecutor dataChannelChannelTaskExecutor,
InfoChannel infoChannel)
{
- return new ChannelRunner(
- infoChannelChannelExecutor,
- dataChannelChannelExecutor,
+ return new ChannelTaskRunner(
+ infoChannelChannelTaskExecutor,
+ dataChannelChannelTaskExecutor,
infoChannel);
}
@Bean
- ChannelExecutor infoChannelConsumerTaskExecutor(
+ ChannelTaskExecutor infoChannelConsumerTaskExecutor(
ThreadPoolTaskExecutor taskExecutor,
InfoChannel infoChannel,
Consumer<String, AbstractMessageTo> infoChannelConsumer,
WorkAssignor infoChannelWorkAssignor)
{
- return new ChannelExecutor(
+ return new ChannelTaskExecutor(
taskExecutor,
infoChannel,
infoChannelConsumer,
}
@Bean
- ChannelExecutor dataChannelConsumerTaskExecutor(
+ ChannelTaskExecutor dataChannelConsumerTaskExecutor(
ThreadPoolTaskExecutor taskExecutor,
DataChannel dataChannel,
Consumer<String, AbstractMessageTo> dataChannelConsumer,
WorkAssignor dataChannelWorkAssignor)
{
- return new ChannelExecutor(
+ return new ChannelTaskExecutor(
taskExecutor,
dataChannel,
dataChannelConsumer,
@BeforeAll
public static void sendAndLoadStoredData(
@Autowired KafkaTemplate<String, String> messageTemplate,
- @Autowired ChannelRunner channelRunner)
+ @Autowired ChannelTaskRunner channelTaskRunner)
{
KafkaTestUtils.sendAndLoadStoredData(
messageTemplate,
INFO_TOPIC,
DATA_TOPIC,
- channelRunner);
+ channelTaskRunner);
}
@AfterAll
static void joinConsumerTasks(
- @Autowired ChannelRunner channelRunner)
+ @Autowired ChannelTaskRunner channelTaskRunner)
throws InterruptedException
{
- KafkaTestUtils.joinConsumerTasks(channelRunner);
+ KafkaTestUtils.joinConsumerTasks(channelTaskRunner);
}
@BeforeAll
static void sendAndLoadStoredData(
@Autowired KafkaTemplate<String, String> messageTemplate,
- @Autowired ChannelRunner channelRunner)
+ @Autowired ChannelTaskRunner channelTaskRunner)
{
KafkaTestUtils.sendAndLoadStoredData(
messageTemplate,
INFO_TOPIC,
DATA_TOPIC,
- channelRunner);
+ channelTaskRunner);
}
@AfterAll
static void joinConsumerTasks(
- @Autowired ChannelRunner channelRunner)
+ @Autowired ChannelTaskRunner channelTaskRunner)
throws InterruptedException
{
- KafkaTestUtils.joinConsumerTasks(channelRunner);
+ KafkaTestUtils.joinConsumerTasks(channelTaskRunner);
}
}
KafkaTemplate<String, String> messageTemplate,
String infoTopic,
String dataTopic,
- ChannelRunner channelRunner)
+ ChannelTaskRunner channelTaskRunner)
{
send(messageTemplate, infoTopic, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": 2, \"name\": \"FOO\" }", "event_chatroom_created");
send(messageTemplate, dataTopic, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "event_chatmessage_received");
send(messageTemplate, dataTopic, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "event_chatmessage_received");
send(messageTemplate, dataTopic, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "event_chatmessage_received");
- channelRunner.executeChannel();
+ channelTaskRunner.executeChannel();
}
private static void send(
new TopicPartition(result.getRecordMetadata().topic(), result.getRecordMetadata().partition()));
}
- public static void joinConsumerTasks(ChannelRunner channelRunner) throws InterruptedException
+ public static void joinConsumerTasks(ChannelTaskRunner channelTaskRunner) throws InterruptedException
{
- channelRunner.joinChannel();
+ channelTaskRunner.joinChannel();
}
}