-package de.juplo.kafka.chat.backend.persistence.kafka;
+package de.juplo.kafka.chat.backend.implementation.kafka;
import de.juplo.kafka.chat.backend.ChatBackendProperties;
import de.juplo.kafka.chat.backend.domain.ChatHomeService;
-import de.juplo.kafka.chat.backend.persistence.kafka.messages.AbstractMessageTo;
-import de.juplo.kafka.chat.backend.persistence.kafka.messages.CommandCreateChatRoomTo;
-import de.juplo.kafka.chat.backend.persistence.kafka.messages.EventChatMessageReceivedTo;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.CommandCreateChatRoomTo;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.EventChatMessageReceivedTo;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.time.Clock;
import java.time.ZoneId;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
public class KafkaServicesConfiguration
{
@Bean
- ChatHomeService kafkaChatHome(
+ KafkaServicesApplicationRunner kafkaServicesApplicationRunner(
+ ThreadPoolTaskExecutor taskExecutor,
+ ChatRoomChannel chatRoomChannel,
+ Consumer<String, AbstractMessageTo> chatRoomChannelConsumer,
+ KafkaServicesApplicationRunner.WorkAssignor workAssignor)
+ {
+ return new KafkaServicesApplicationRunner(
+ taskExecutor,
+ chatRoomChannel,
+ chatRoomChannelConsumer,
+ workAssignor);
+ }
+
+ @Bean
+ KafkaServicesApplicationRunner.WorkAssignor workAssignor(
+ ChatBackendProperties properties,
+ ChatRoomChannel chatRoomChannel)
+ {
+ return consumer ->
+ {
+ List<String> topics =
+ List.of(properties.getKafka().getChatRoomChannelTopic());
+ consumer.subscribe(topics, chatRoomChannel);
+ };
+ }
+
+ @Bean
+ ChatHomeService kafkaChatHome(
ChatBackendProperties properties,
ChatRoomChannel chatRoomChannel)
{