package de.juplo.kafka.chat.backend.implementation.kafka;
-import de.juplo.kafka.chat.backend.ChatBackendProperties;
import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
import jakarta.annotation.PreDestroy;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.TopicPartition;
-import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
-import org.springframework.stereotype.Component;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@Slf4j
public class KafkaServicesApplicationRunner implements ApplicationRunner
{
+ private final String infoTopic;
private final ThreadPoolTaskExecutor taskExecutor;
+ private final InfoChannel infoChannel;
private final DataChannel dataChannel;
- private final Consumer<String, AbstractMessageTo> chatRoomChannelConsumer;
+ private final Consumer<String, AbstractMessageTo> infoChannelConsumer;
+ private final Consumer<String, AbstractMessageTo> dataChannelConsumer;
private final WorkAssignor workAssignor;
CompletableFuture<Void> infoChannelConsumerJob;
@Override
public void run(ApplicationArguments args) throws Exception
{
- String infoTopic = properties.getKafka().getInfoChannelTopic();
List<TopicPartition> partitions = infoChannelConsumer
.partitionsFor(infoTopic)
.stream()
Thread.sleep(1000);
}
- workAssignor.assignWork(chatRoomChannelConsumer);
+ workAssignor.assignWork(dataChannelConsumer);
log.info("Starting the consumer for the DataChannel");
dataChannelConsumerJob = taskExecutor
.submitCompletable(dataChannel)
{
@Bean
KafkaServicesApplicationRunner kafkaServicesApplicationRunner(
+ ChatBackendProperties properties,
ThreadPoolTaskExecutor taskExecutor,
- ChatRoomChannel chatRoomChannel,
- Consumer<String, AbstractMessageTo> chatRoomChannelConsumer,
+ InfoChannel infoChannel,
+ DataChannel dataChannel,
+ Consumer<String, AbstractMessageTo> infoChannelConsumer,
+ Consumer<String, AbstractMessageTo> dataChannelConsumer,
KafkaServicesApplicationRunner.WorkAssignor workAssignor)
{
return new KafkaServicesApplicationRunner(
+ properties.getKafka().getInfoChannelTopic(),
taskExecutor,
- chatRoomChannel,
- chatRoomChannelConsumer,
+ infoChannel,
+ dataChannel,
+ infoChannelConsumer,
+ dataChannelConsumer,
workAssignor);
}
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.DefaultApplicationArguments;
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
import org.springframework.boot.autoconfigure.task.TaskExecutionAutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
final static String INFO_TOPIC = "KAFKA_CHAT_HOME_TEST_INFO";
final static String DATA_TOPIC = "KAFKA_CHAT_HOME_TEST_DATA";
-<<<<<<< HEAD
-=======
- static CompletableFuture<Void> INFO_CHANNEL_CONSUMER_JOB;
- static CompletableFuture<Void> DATA_CHANNEL_CONSUMER_JOB;
-
->>>>>>> 5c8db7f (WIP)
@TestConfiguration
@EnableConfigurationProperties(ChatBackendProperties.class)
static class KafkaChatHomeTestConfiguration
{
@Bean
- KafkaServicesApplicationRunner.WorkAssignor workAssignor(
- ChatRoomChannel chatRoomChannel)
+ KafkaServicesApplicationRunner.WorkAssignor workAssignor(DataChannel dataChannel)
{
return consumer ->
{
List<TopicPartition> assignedPartitions =
- List.of(new TopicPartition(TOPIC, 2));
+ List.of(new TopicPartition(DATA_TOPIC, 2));
consumer.assign(assignedPartitions);
- chatRoomChannel.onPartitionsAssigned(assignedPartitions);
+ dataChannel.onPartitionsAssigned(assignedPartitions);
};
}
@AfterAll
static void joinConsumerJob(@Autowired KafkaServicesApplicationRunner applicationRunner)
{
-<<<<<<< HEAD
applicationRunner.joinChatRoomChannelConsumerJob();
-=======
- log.info("Signaling the consumer of the CahtRoomChannel to quit its work");
- chatRoomChannelConsumer.wakeup();
- log.info("Waiting for the consumer of the ChatRoomChannel to finish its work");
- DATA_CHANNEL_CONSUMER_JOB.join();
- log.info("Joined the consumer of the ChatRoomChannel");
->>>>>>> 5c8db7f (WIP)
}
}