@RequiredArgsConstructor
@Slf4j
-public class ConsumerTaskExecutor
+public class ChannelExecutor
{
private final ThreadPoolTaskExecutor taskExecutor;
private final Runnable consumerTask;
@RequiredArgsConstructor
@Slf4j
-public class ConsumerTaskRunner
+public class ChannelRunner
{
- private final ConsumerTaskExecutor infoChannelConsumerTaskExecutor;
- private final ConsumerTaskExecutor dataChannelConsumerTaskExecutor;
+ private final ChannelExecutor infoChannelChannelExecutor;
+ private final ChannelExecutor dataChannelChannelExecutor;
private final InfoChannel infoChannel;
public void executeConsumerTasks()
{
- infoChannelConsumerTaskExecutor.executeConsumerTask();
- dataChannelConsumerTaskExecutor.executeConsumerTask();
+ infoChannelChannelExecutor.executeConsumerTask();
+ dataChannelChannelExecutor.executeConsumerTask();
}
public void joinConsumerTasks() throws InterruptedException
{
- dataChannelConsumerTaskExecutor.joinConsumerTaskJob();
+ dataChannelChannelExecutor.joinConsumerTaskJob();
while (infoChannel.getChannelState() != ChannelState.SHUTTING_DOWN)
{
log.info("Waiting for {} to shut down...", infoChannel);
Thread.sleep(1000);
}
- infoChannelConsumerTaskExecutor.joinConsumerTaskJob();
+ infoChannelChannelExecutor.joinConsumerTaskJob();
}
}
@RequiredArgsConstructor
public class KafkaServicesApplicationRunner implements ApplicationRunner
{
- private final ConsumerTaskRunner consumerTaskRunner;
+ private final ChannelRunner channelRunner;
@Override
public void run(ApplicationArguments args)
{
- consumerTaskRunner.executeConsumerTasks();
+ channelRunner.executeConsumerTasks();
}
@PreDestroy
public void joinConsumerTasks() throws InterruptedException
{
- consumerTaskRunner.joinConsumerTasks();
+ channelRunner.joinConsumerTasks();
}
}
package de.juplo.kafka.chat.backend.implementation.kafka;
import de.juplo.kafka.chat.backend.ChatBackendProperties;
-import de.juplo.kafka.chat.backend.domain.ChatHomeService;
import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy;
import de.juplo.kafka.chat.backend.implementation.haproxy.HaproxyShardingPublisherStrategy;
import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
public class KafkaServicesConfiguration
{
@Bean
- ConsumerTaskRunner consumerTaskRunner(
- ConsumerTaskExecutor infoChannelConsumerTaskExecutor,
- ConsumerTaskExecutor dataChannelConsumerTaskExecutor,
+ ChannelRunner consumerTaskRunner(
+ ChannelExecutor infoChannelChannelExecutor,
+ ChannelExecutor dataChannelChannelExecutor,
InfoChannel infoChannel)
{
- return new ConsumerTaskRunner(
- infoChannelConsumerTaskExecutor,
- dataChannelConsumerTaskExecutor,
+ return new ChannelRunner(
+ infoChannelChannelExecutor,
+ dataChannelChannelExecutor,
infoChannel);
}
@Bean
- ConsumerTaskExecutor infoChannelConsumerTaskExecutor(
+ ChannelExecutor infoChannelConsumerTaskExecutor(
ThreadPoolTaskExecutor taskExecutor,
InfoChannel infoChannel,
Consumer<String, AbstractMessageTo> infoChannelConsumer,
WorkAssignor infoChannelWorkAssignor)
{
- return new ConsumerTaskExecutor(
+ return new ChannelExecutor(
taskExecutor,
infoChannel,
infoChannelConsumer,
}
@Bean
- ConsumerTaskExecutor dataChannelConsumerTaskExecutor(
+ ChannelExecutor dataChannelConsumerTaskExecutor(
ThreadPoolTaskExecutor taskExecutor,
DataChannel dataChannel,
Consumer<String, AbstractMessageTo> dataChannelConsumer,
WorkAssignor dataChannelWorkAssignor)
{
- return new ConsumerTaskExecutor(
+ return new ChannelExecutor(
taskExecutor,
dataChannel,
dataChannelConsumer,
@BeforeAll
public static void sendAndLoadStoredData(
@Autowired KafkaTemplate<String, String> messageTemplate,
- @Autowired ConsumerTaskRunner consumerTaskRunner)
+ @Autowired ChannelRunner channelRunner)
{
KafkaTestUtils.sendAndLoadStoredData(
messageTemplate,
INFO_TOPIC,
DATA_TOPIC,
- consumerTaskRunner);
+ channelRunner);
}
@AfterAll
static void joinConsumerTasks(
- @Autowired ConsumerTaskRunner consumerTaskRunner)
+ @Autowired ChannelRunner channelRunner)
throws InterruptedException
{
- KafkaTestUtils.joinConsumerTasks(consumerTaskRunner);
+ KafkaTestUtils.joinConsumerTasks(channelRunner);
}
@BeforeAll
static void sendAndLoadStoredData(
@Autowired KafkaTemplate<String, String> messageTemplate,
- @Autowired ConsumerTaskRunner consumerTaskRunner)
+ @Autowired ChannelRunner channelRunner)
{
KafkaTestUtils.sendAndLoadStoredData(
messageTemplate,
INFO_TOPIC,
DATA_TOPIC,
- consumerTaskRunner);
+ channelRunner);
}
@AfterAll
static void joinConsumerTasks(
- @Autowired ConsumerTaskRunner consumerTaskRunner)
+ @Autowired ChannelRunner channelRunner)
throws InterruptedException
{
- KafkaTestUtils.joinConsumerTasks(consumerTaskRunner);
+ KafkaTestUtils.joinConsumerTasks(channelRunner);
}
}
KafkaTemplate<String, String> messageTemplate,
String infoTopic,
String dataTopic,
- ConsumerTaskRunner consumerTaskRunner)
+ ChannelRunner channelRunner)
{
send(messageTemplate, infoTopic, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": 2, \"name\": \"FOO\" }", "event_chatroom_created");
send(messageTemplate, dataTopic, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "event_chatmessage_received");
send(messageTemplate, dataTopic, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "event_chatmessage_received");
send(messageTemplate, dataTopic, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "event_chatmessage_received");
- consumerTaskRunner.executeConsumerTasks();
+ channelRunner.executeConsumerTasks();
}
private static void send(
new TopicPartition(result.getRecordMetadata().topic(), result.getRecordMetadata().partition()));
}
- public static void joinConsumerTasks(ConsumerTaskRunner consumerTaskRunner) throws InterruptedException
+ public static void joinConsumerTasks(ChannelRunner channelRunner) throws InterruptedException
{
- consumerTaskRunner.joinConsumerTasks();
+ channelRunner.joinConsumerTasks();
}
}