import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
import jakarta.annotation.PreDestroy;
+import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
public class ChannelExecutor
{
private final ThreadPoolTaskExecutor taskExecutor;
- private final Runnable consumerTask;
+ @Getter
+ private final Channel channel;
private final Consumer<String, AbstractMessageTo> consumer;
private final WorkAssignor workAssignor;
public void executeConsumerTask()
{
workAssignor.assignWork(consumer);
- log.info("Starting the consumer-task for {}", consumerTask);
+ log.info("Starting the consumer-task for {}", channel);
consumerTaskJob = taskExecutor
- .submitCompletable(consumerTask)
+ .submitCompletable(channel)
.exceptionally(e ->
{
- log.error("The consumer-task for {} exited abnormally!", consumerTask, e);
+ log.error("The consumer-task for {} exited abnormally!", channel, e);
return null;
});
}
@PreDestroy
public void joinConsumerTaskJob()
{
- log.info("Signaling the consumer-task for {} to quit its work", consumerTask);
+ log.info("Signaling the consumer-task for {} to quit its work", channel);
consumer.wakeup();
- log.info("Waiting for the consumer of {} to finish its work", consumerTask);
+ log.info("Waiting for the consumer of {} to finish its work", channel);
consumerTaskJob.join();
- log.info("Joined the consumer-task for {}", consumerTask);
+ log.info("Joined the consumer-task for {}", channel);
}
}
@Slf4j
public class ChannelRunner
{
- private final ChannelExecutor infoChannelChannelExecutor;
- private final ChannelExecutor dataChannelChannelExecutor;
- private final InfoChannel infoChannel;
+ private final ChannelExecutor infoChannelExecutor;
+ private final ChannelExecutor dataChannelExecutor;
- public void executeConsumerTasks()
+ public void executeChannel()
{
- infoChannelChannelExecutor.executeConsumerTask();
- dataChannelChannelExecutor.executeConsumerTask();
+ infoChannelExecutor.executeConsumerTask();
+ dataChannelExecutor.executeConsumerTask();
}
- public void joinConsumerTasks() throws InterruptedException
+ public void joinChannel() throws InterruptedException
{
- dataChannelChannelExecutor.joinConsumerTaskJob();
+ dataChannelExecutor.joinConsumerTaskJob();
while (infoChannel.getChannelState() != ChannelState.SHUTTING_DOWN)
{
log.info("Waiting for {} to shut down...", infoChannel);
Thread.sleep(1000);
}
- infoChannelChannelExecutor.joinConsumerTaskJob();
+ infoChannelExecutor.joinConsumerTaskJob();
}
+
+ private void joinChannel()
}
import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo;
import lombok.Getter;
+import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.Producer;
import java.util.stream.IntStream;
+@ToString(of = { "topic", "instanceId" })
@Slf4j
-public class DataChannel implements Runnable, ConsumerRebalanceListener
+public class DataChannel implements Channel, ConsumerRebalanceListener
{
private final String instanceId;
private final String topic;
import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardAssigned;
import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardRevoked;
import lombok.Getter;
+import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.util.stream.IntStream;
+@ToString(of = { "topic", "instanceUri" })
@Slf4j
-public class InfoChannel implements Runnable
+public class InfoChannel implements Channel
{
private final String topic;
private final Producer<String, AbstractMessageTo> producer;
@Override
public void run(ApplicationArguments args)
{
- channelRunner.executeConsumerTasks();
+ channelRunner.executeChannel();
}
@PreDestroy
public void joinConsumerTasks() throws InterruptedException
{
- channelRunner.joinConsumerTasks();
+ channelRunner.joinChannel();
}
}
send(messageTemplate, dataTopic, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "event_chatmessage_received");
send(messageTemplate, dataTopic, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "event_chatmessage_received");
- channelRunner.executeConsumerTasks();
+ channelRunner.executeChannel();
}
private static void send(
public static void joinConsumerTasks(ChannelRunner channelRunner) throws InterruptedException
{
- channelRunner.joinConsumerTasks();
+ channelRunner.joinChannel();
}
}