package de.juplo.kafka.chat.backend.persistence.kafka;
import de.juplo.kafka.chat.backend.ChatBackendProperties;
-import de.juplo.kafka.chat.backend.ChatBackendProperties.ShardingStrategyType;
import de.juplo.kafka.chat.backend.domain.ChatHome;
-import de.juplo.kafka.chat.backend.domain.ShardedChatHome;
import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
-import de.juplo.kafka.chat.backend.domain.SimpleChatHome;
import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
-import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatHomeService;
-import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomFactory;
+import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.support.serializer.JsonDeserializer;
+import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
-import org.springframework.util.concurrent.ListenableFuture;
import java.time.Clock;
import java.time.ZoneId;
-import java.util.Optional;
+import java.util.Properties;
import java.util.concurrent.CompletableFuture;
@Autowired
ChatMessageChannel chatMessageChannel;
+ @Autowired
+ ChatRoomChannel chatRoomChannel;
- CompletableFuture<Optional<Exception>> chatRoomChannelConsumerJob;
- CompletableFuture<Optional<Exception>> chatMessageChannelConsumerJob;
+ CompletableFuture<Void> chatRoomChannelConsumerJob;
+ CompletableFuture<Void> chatMessageChannelConsumerJob;
@Override
public void run(ApplicationArguments args) throws Exception
{
log.info("Starting the consumer for the ChatRoomChannel");
- chatRoomChannelConsumerJob = taskExecutor.submitCompletable(chatMessageChannel);
- chatRoomChannelConsumerJob.thenAccept(exceptionOptional ->
- {
- exceptionOptional.ifPresent();
- log.info("SimpleConsumer exited normally, exit-status: {}", exitStatus);
- SpringApplication.exit(context, () -> exitStatus);
- },
- t ->
+ chatRoomChannelConsumerJob = taskExecutor
+ .submitCompletable(chatRoomChannel)
+ .exceptionally(e ->
+ {
+ log.error("The consumer for the ChatRoomChannel exited abnormally!", e);
+ return null;
+ });
+ log.info("Starting the consumer for the ChatMessageChannel");
+ chatMessageChannelConsumerJob = taskExecutor
+ .submitCompletable(chatMessageChannel)
+ .exceptionally(e ->
{
- log.error("SimpleConsumer exited abnormally!", t);
- SpringApplication.exit(context, () -> 2);
+ log.error("The consumer for the ChatMessageChannel exited abnormally!", e);
+ return null;
});
}
@PreDestroy
- public void shutdown() throws ExecutionException, InterruptedException
+ public void joinChatRoomChannelConsumerJob()
{
- log.info("Signaling SimpleConsumer to quit its work");
- kafkaConsumer.wakeup();
- log.info("Waiting for SimpleConsumer to finish its work");
- consumerJob.get();
- log.info("SimpleConsumer finished its work");
+ log.info("Waiting for the consumer of the ChatRoomChannel to finish its work");
+ chatRoomChannelConsumerJob.join();
+ log.info("Joined the consumer of the ChatRoomChannel");
+ }
+
+ @PreDestroy
+ public void joinChatMessageChannelConsumerJob()
+ {
+ log.info("Waiting for the consumer of the ChatMessageChannel to finish its work");
+ chatMessageChannelConsumerJob.join();
+ log.info("Joined the consumer of the ChatMessageChannel");
}
properties.getChatroomBufferSize());
}
+ @Bean
+ Producer<Integer, ChatRoomTo> chatRoomChannelProducer(
+ Properties defaultProducerProperties,
+ IntegerSerializer integerSerializer,
+ JsonSerializer<ChatRoomTo> chatRoomSerializer)
+ {
+ return new KafkaProducer<>(
+ defaultProducerProperties,
+ integerSerializer,
+ chatRoomSerializer);
+ }
+
+ @Bean
+ IntegerSerializer integerSerializer()
+ {
+ return new IntegerSerializer();
+ }
+
+ @Bean
+ JsonSerializer<ChatRoomTo> chatRoomSerializer()
+ {
+ JsonSerializer<ChatRoomTo> serializer = new JsonSerializer<>();
+ return serializer;
+ }
+
+ @Bean
+ Consumer<Integer, ChatRoomTo> chatRoomChannelConsumer(
+ Properties defaultConsumerProperties,
+ IntegerDeserializer integerDeserializer,
+ JsonDeserializer<ChatRoomTo> chatRoomDeserializer)
+ {
+ Properties properties = new Properties(defaultConsumerProperties);
+ properties.setProperty(
+ ConsumerConfig.GROUP_ID_CONFIG,
+ "chat_room_channel");
+ return new KafkaConsumer<>(
+ properties,
+ integerDeserializer,
+ chatRoomDeserializer);
+ }
+
+ @Bean
+ IntegerDeserializer integerDeserializer()
+ {
+ return new IntegerDeserializer();
+ }
+
+ @Bean
+ JsonDeserializer<ChatRoomTo> chatRoomDeserializer()
+ {
+ JsonDeserializer<ChatRoomTo> deserializer = new JsonDeserializer<>();
+ return deserializer;
+ }
+
+ @Bean
+ ShardingStrategy shardingStrategy(ChatBackendProperties properties)
+ {
+ return new KafkaLikeShardingStrategy(properties.getKafka().getNumPartitions());
+ }
+
@Bean
ChatMessageChannel chatMessageChannel(
ChatBackendProperties properties,
}
@Bean
- ShardingStrategy shardingStrategy(ChatBackendProperties properties)
+ Producer<String, MessageTo> chatMessageChannelProducer(
+ Properties defaultProducerProperties,
+ StringSerializer stringSerializer,
+ JsonSerializer<MessageTo> messageSerializer)
{
- return new KafkaLikeShardingStrategy(properties.getKafka().getNumPartitions());
+ return new KafkaProducer<>(
+ defaultProducerProperties,
+ stringSerializer,
+ messageSerializer);
+ }
+
+ @Bean
+ StringSerializer stringSerializer()
+ {
+ return new StringSerializer();
+ }
+
+ @Bean
+ JsonSerializer<MessageTo> chatMessageSerializer()
+ {
+ JsonSerializer<MessageTo> serializer = new JsonSerializer<>();
+ return serializer;
+ }
+
+ @Bean
+ Consumer<String, MessageTo> chatMessageChannelConsumer(
+ Properties defaultConsumerProperties,
+ StringDeserializer stringDeserializer,
+ JsonDeserializer<MessageTo> messageDeserializer)
+ {
+ Properties properties = new Properties(defaultConsumerProperties);
+ properties.setProperty(
+ ConsumerConfig.GROUP_ID_CONFIG,
+ "chat_message_channel");
+ return new KafkaConsumer<>(
+ properties,
+ stringDeserializer,
+ messageDeserializer);
+ }
+
+ @Bean
+ StringDeserializer stringDeserializer()
+ {
+ return new StringDeserializer();
+ }
+
+ @Bean
+ JsonDeserializer<MessageTo> chatMessageDeserializer()
+ {
+ JsonDeserializer<MessageTo> deserializer = new JsonDeserializer<>();
+ return deserializer;
+ }
+
+ @Bean
+ Properties defaultProducerProperties(ChatBackendProperties chatBackendProperties)
+ {
+ Properties properties = new Properties();
+ properties.setProperty(
+ ProducerConfig.CLIENT_ID_CONFIG,
+ chatBackendProperties.getKafka().getClientId());
+ properties.setProperty(
+ ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
+ chatBackendProperties.getKafka().getBootstrapServers());
+ return properties;
+ }
+
+ @Bean
+ Properties defaultConsumerProperties(ChatBackendProperties chatBackendProperties)
+ {
+ Properties properties = new Properties();
+ properties.setProperty(
+ ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
+ chatBackendProperties.getKafka().getBootstrapServers());
+ properties.setProperty(
+ ConsumerConfig.CLIENT_ID_CONFIG,
+ chatBackendProperties.getKafka().getClientId());
+ properties.setProperty(
+ ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
+ "false");
+ properties.setProperty(
+ ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
+ "earliest");
+ return properties;
}
@Bean