X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fpersistence%2Fkafka%2FKafkaServicesConfiguration.java;h=a3dddb1ba26869a473df9c7de8a3e68adacc9023;hb=4cf9f879647735d6f635bc6fb7930486a2cc6d72;hp=efc86b9e277314dd43e66381e7c07b95aa72c7f3;hpb=a574411f66c4e76fa547e004912716e37a5f2a01;p=demos%2Fkafka%2Fchat diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java index efc86b9e..a3dddb1b 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java @@ -4,8 +4,6 @@ import de.juplo.kafka.chat.backend.ChatBackendProperties; import de.juplo.kafka.chat.backend.domain.ChatHome; import de.juplo.kafka.chat.backend.domain.ShardingStrategy; import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy; -import jakarta.annotation.PreDestroy; -import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -16,21 +14,17 @@ import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.ApplicationArguments; -import org.springframework.boot.ApplicationRunner; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.support.serializer.JsonDeserializer; import org.springframework.kafka.support.serializer.JsonSerializer; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.time.Clock; import java.time.ZoneId; +import java.util.HashMap; +import java.util.Map; import java.util.Properties; -import java.util.concurrent.CompletableFuture; @ConditionalOnProperty( @@ -38,61 +32,8 @@ import java.util.concurrent.CompletableFuture; name = "services", havingValue = "kafka") @Configuration -@Slf4j -public class KafkaServicesConfiguration implements ApplicationRunner +public class KafkaServicesConfiguration { - @Autowired - ThreadPoolTaskExecutor taskExecutor; - @Autowired - ConfigurableApplicationContext context; - - @Autowired - ChatMessageChannel chatMessageChannel; - @Autowired - ChatRoomChannel chatRoomChannel; - - CompletableFuture chatRoomChannelConsumerJob; - CompletableFuture chatMessageChannelConsumerJob; - - - @Override - public void run(ApplicationArguments args) throws Exception - { - log.info("Starting the consumer for the ChatRoomChannel"); - chatRoomChannelConsumerJob = taskExecutor - .submitCompletable(chatRoomChannel) - .exceptionally(e -> - { - log.error("The consumer for the ChatRoomChannel exited abnormally!", e); - return null; - }); - log.info("Starting the consumer for the ChatMessageChannel"); - chatMessageChannelConsumerJob = taskExecutor - .submitCompletable(chatMessageChannel) - .exceptionally(e -> - { - log.error("The consumer for the ChatMessageChannel exited abnormally!", e); - return null; - }); - } - - @PreDestroy - public void joinChatRoomChannelConsumerJob() - { - log.info("Waiting for the consumer of the ChatRoomChannel to finish its work"); - chatRoomChannelConsumerJob.join(); - log.info("Joined the consumer of the ChatRoomChannel"); - } - - @PreDestroy - public void joinChatMessageChannelConsumerJob() - { - log.info("Waiting for the consumer of the ChatMessageChannel to finish its work"); - chatMessageChannelConsumerJob.join(); - log.info("Joined the consumer of the ChatMessageChannel"); - } - - @Bean ChatHome kafkaChatHome( ShardingStrategy shardingStrategy, @@ -129,11 +70,17 @@ public class KafkaServicesConfiguration implements ApplicationRunner @Bean Producer chatRoomChannelProducer( Properties defaultProducerProperties, + ChatBackendProperties chatBackendProperties, IntegerSerializer integerSerializer, JsonSerializer chatRoomSerializer) { + Map properties = new HashMap<>(); + defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value)); + properties.put( + ProducerConfig.CLIENT_ID_CONFIG, + chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_PRODUCER"); return new KafkaProducer<>( - defaultProducerProperties, + properties, integerSerializer, chatRoomSerializer); } @@ -148,17 +95,25 @@ public class KafkaServicesConfiguration implements ApplicationRunner JsonSerializer chatRoomSerializer() { JsonSerializer serializer = new JsonSerializer<>(); + serializer.configure( + Map.of(JsonSerializer.ADD_TYPE_INFO_HEADERS, false), + false); return serializer; } @Bean Consumer chatRoomChannelConsumer( Properties defaultConsumerProperties, + ChatBackendProperties chatBackendProperties, IntegerDeserializer integerDeserializer, JsonDeserializer chatRoomDeserializer) { - Properties properties = new Properties(defaultConsumerProperties); - properties.setProperty( + Map properties = new HashMap<>(); + defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value)); + properties.put( + ConsumerConfig.CLIENT_ID_CONFIG, + chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_CONSUMER"); + properties.put( ConsumerConfig.GROUP_ID_CONFIG, "chat_room_channel"); return new KafkaConsumer<>( @@ -177,6 +132,12 @@ public class KafkaServicesConfiguration implements ApplicationRunner JsonDeserializer chatRoomDeserializer() { JsonDeserializer deserializer = new JsonDeserializer<>(); + deserializer.configure( + Map.of( + JsonDeserializer.USE_TYPE_INFO_HEADERS, false, + JsonDeserializer.VALUE_DEFAULT_TYPE, ChatRoomTo.class, + JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName()), + false ); return deserializer; } @@ -204,11 +165,17 @@ public class KafkaServicesConfiguration implements ApplicationRunner @Bean Producer chatMessageChannelProducer( Properties defaultProducerProperties, + ChatBackendProperties chatBackendProperties, StringSerializer stringSerializer, JsonSerializer messageSerializer) { + Map properties = new HashMap<>(); + defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value)); + properties.put( + ProducerConfig.CLIENT_ID_CONFIG, + chatBackendProperties.getKafka().getClientIdPrefix() + "_MESSAGE_CHANNEL_PRODUCER"); return new KafkaProducer<>( - defaultProducerProperties, + properties, stringSerializer, messageSerializer); } @@ -223,17 +190,25 @@ public class KafkaServicesConfiguration implements ApplicationRunner JsonSerializer chatMessageSerializer() { JsonSerializer serializer = new JsonSerializer<>(); + serializer.configure( + Map.of(JsonSerializer.ADD_TYPE_INFO_HEADERS, false), + false); return serializer; } @Bean Consumer chatMessageChannelConsumer( Properties defaultConsumerProperties, + ChatBackendProperties chatBackendProperties, StringDeserializer stringDeserializer, JsonDeserializer messageDeserializer) { - Properties properties = new Properties(defaultConsumerProperties); - properties.setProperty( + Map properties = new HashMap<>(); + defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value)); + properties.put( + ConsumerConfig.CLIENT_ID_CONFIG, + chatBackendProperties.getKafka().getClientIdPrefix() + "_MESSAGE_CHANNEL_CONSUMER"); + properties.put( ConsumerConfig.GROUP_ID_CONFIG, "chat_message_channel"); return new KafkaConsumer<>( @@ -252,6 +227,12 @@ public class KafkaServicesConfiguration implements ApplicationRunner JsonDeserializer chatMessageDeserializer() { JsonDeserializer deserializer = new JsonDeserializer<>(); + deserializer.configure( + Map.of( + JsonDeserializer.USE_TYPE_INFO_HEADERS, false, + JsonDeserializer.VALUE_DEFAULT_TYPE, MessageTo.class, + JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName()), + false ); return deserializer; } @@ -259,9 +240,6 @@ public class KafkaServicesConfiguration implements ApplicationRunner Properties defaultProducerProperties(ChatBackendProperties chatBackendProperties) { Properties properties = new Properties(); - properties.setProperty( - ProducerConfig.CLIENT_ID_CONFIG, - chatBackendProperties.getKafka().getClientId()); properties.setProperty( ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, chatBackendProperties.getKafka().getBootstrapServers()); @@ -275,9 +253,6 @@ public class KafkaServicesConfiguration implements ApplicationRunner properties.setProperty( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, chatBackendProperties.getKafka().getBootstrapServers()); - properties.setProperty( - ConsumerConfig.CLIENT_ID_CONFIG, - chatBackendProperties.getKafka().getClientId()); properties.setProperty( ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");