NEU
[demos/kafka/chat] / src / test / java / de / juplo / kafka / chat / backend / persistence / kafka / KafkaServicesConfiguration.java
index efc86b9..a3dddb1 100644 (file)
@@ -4,8 +4,6 @@ import de.juplo.kafka.chat.backend.ChatBackendProperties;
 import de.juplo.kafka.chat.backend.domain.ChatHome;
 import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
 import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
-import jakarta.annotation.PreDestroy;
-import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -16,21 +14,17 @@ import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.ApplicationArguments;
-import org.springframework.boot.ApplicationRunner;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
-import org.springframework.context.ConfigurableApplicationContext;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.kafka.support.serializer.JsonDeserializer;
 import org.springframework.kafka.support.serializer.JsonSerializer;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 
 import java.time.Clock;
 import java.time.ZoneId;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
-import java.util.concurrent.CompletableFuture;
 
 
 @ConditionalOnProperty(
@@ -38,61 +32,8 @@ import java.util.concurrent.CompletableFuture;
     name = "services",
     havingValue = "kafka")
 @Configuration
-@Slf4j
-public class KafkaServicesConfiguration implements ApplicationRunner
+public class KafkaServicesConfiguration
 {
-  @Autowired
-  ThreadPoolTaskExecutor taskExecutor;
-  @Autowired
-  ConfigurableApplicationContext context;
-
-  @Autowired
-  ChatMessageChannel chatMessageChannel;
-  @Autowired
-  ChatRoomChannel chatRoomChannel;
-
-  CompletableFuture<Void> chatRoomChannelConsumerJob;
-  CompletableFuture<Void> chatMessageChannelConsumerJob;
-
-
-  @Override
-  public void run(ApplicationArguments args) throws Exception
-  {
-    log.info("Starting the consumer for the ChatRoomChannel");
-    chatRoomChannelConsumerJob = taskExecutor
-        .submitCompletable(chatRoomChannel)
-        .exceptionally(e ->
-        {
-          log.error("The consumer for the ChatRoomChannel exited abnormally!", e);
-          return null;
-        });
-    log.info("Starting the consumer for the ChatMessageChannel");
-    chatMessageChannelConsumerJob = taskExecutor
-        .submitCompletable(chatMessageChannel)
-        .exceptionally(e ->
-        {
-          log.error("The consumer for the ChatMessageChannel exited abnormally!", e);
-          return null;
-        });
-  }
-
-  @PreDestroy
-  public void joinChatRoomChannelConsumerJob()
-  {
-    log.info("Waiting for the consumer of the ChatRoomChannel to finish its work");
-    chatRoomChannelConsumerJob.join();
-    log.info("Joined the consumer of the ChatRoomChannel");
-  }
-
-  @PreDestroy
-  public void joinChatMessageChannelConsumerJob()
-  {
-    log.info("Waiting for the consumer of the ChatMessageChannel to finish its work");
-    chatMessageChannelConsumerJob.join();
-    log.info("Joined the consumer of the ChatMessageChannel");
-  }
-
-
   @Bean
   ChatHome kafkaChatHome(
       ShardingStrategy shardingStrategy,
@@ -129,11 +70,17 @@ public class KafkaServicesConfiguration implements ApplicationRunner
   @Bean
   Producer<Integer, ChatRoomTo>  chatRoomChannelProducer(
       Properties defaultProducerProperties,
+      ChatBackendProperties chatBackendProperties,
       IntegerSerializer integerSerializer,
       JsonSerializer<ChatRoomTo> chatRoomSerializer)
   {
+    Map<String, Object> properties = new HashMap<>();
+    defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
+    properties.put(
+        ProducerConfig.CLIENT_ID_CONFIG,
+        chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_PRODUCER");
     return new KafkaProducer<>(
-        defaultProducerProperties,
+        properties,
         integerSerializer,
         chatRoomSerializer);
   }
@@ -148,17 +95,25 @@ public class KafkaServicesConfiguration implements ApplicationRunner
   JsonSerializer<ChatRoomTo> chatRoomSerializer()
   {
     JsonSerializer<ChatRoomTo> serializer = new JsonSerializer<>();
+    serializer.configure(
+        Map.of(JsonSerializer.ADD_TYPE_INFO_HEADERS, false),
+        false);
     return serializer;
   }
 
   @Bean
   Consumer<Integer, ChatRoomTo>  chatRoomChannelConsumer(
       Properties defaultConsumerProperties,
+      ChatBackendProperties chatBackendProperties,
       IntegerDeserializer integerDeserializer,
       JsonDeserializer<ChatRoomTo> chatRoomDeserializer)
   {
-    Properties properties = new Properties(defaultConsumerProperties);
-    properties.setProperty(
+    Map<String, Object> properties = new HashMap<>();
+    defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
+    properties.put(
+        ConsumerConfig.CLIENT_ID_CONFIG,
+        chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_CONSUMER");
+    properties.put(
         ConsumerConfig.GROUP_ID_CONFIG,
         "chat_room_channel");
     return new KafkaConsumer<>(
@@ -177,6 +132,12 @@ public class KafkaServicesConfiguration implements ApplicationRunner
   JsonDeserializer<ChatRoomTo> chatRoomDeserializer()
   {
     JsonDeserializer<ChatRoomTo> deserializer = new JsonDeserializer<>();
+    deserializer.configure(
+        Map.of(
+            JsonDeserializer.USE_TYPE_INFO_HEADERS, false,
+            JsonDeserializer.VALUE_DEFAULT_TYPE, ChatRoomTo.class,
+            JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName()),
+        false );
     return deserializer;
   }
 
@@ -204,11 +165,17 @@ public class KafkaServicesConfiguration implements ApplicationRunner
   @Bean
   Producer<String, MessageTo>  chatMessageChannelProducer(
       Properties defaultProducerProperties,
+      ChatBackendProperties chatBackendProperties,
       StringSerializer stringSerializer,
       JsonSerializer<MessageTo> messageSerializer)
   {
+    Map<String, Object> properties = new HashMap<>();
+    defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
+    properties.put(
+        ProducerConfig.CLIENT_ID_CONFIG,
+        chatBackendProperties.getKafka().getClientIdPrefix() + "_MESSAGE_CHANNEL_PRODUCER");
     return new KafkaProducer<>(
-        defaultProducerProperties,
+        properties,
         stringSerializer,
         messageSerializer);
   }
@@ -223,17 +190,25 @@ public class KafkaServicesConfiguration implements ApplicationRunner
   JsonSerializer<MessageTo> chatMessageSerializer()
   {
     JsonSerializer<MessageTo> serializer = new JsonSerializer<>();
+    serializer.configure(
+        Map.of(JsonSerializer.ADD_TYPE_INFO_HEADERS, false),
+        false);
     return serializer;
   }
 
   @Bean
   Consumer<String, MessageTo>  chatMessageChannelConsumer(
       Properties defaultConsumerProperties,
+      ChatBackendProperties chatBackendProperties,
       StringDeserializer stringDeserializer,
       JsonDeserializer<MessageTo> messageDeserializer)
   {
-    Properties properties = new Properties(defaultConsumerProperties);
-    properties.setProperty(
+    Map<String, Object> properties = new HashMap<>();
+    defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
+    properties.put(
+        ConsumerConfig.CLIENT_ID_CONFIG,
+        chatBackendProperties.getKafka().getClientIdPrefix() + "_MESSAGE_CHANNEL_CONSUMER");
+    properties.put(
         ConsumerConfig.GROUP_ID_CONFIG,
         "chat_message_channel");
     return new KafkaConsumer<>(
@@ -252,6 +227,12 @@ public class KafkaServicesConfiguration implements ApplicationRunner
   JsonDeserializer<MessageTo> chatMessageDeserializer()
   {
     JsonDeserializer<MessageTo> deserializer = new JsonDeserializer<>();
+    deserializer.configure(
+        Map.of(
+            JsonDeserializer.USE_TYPE_INFO_HEADERS, false,
+            JsonDeserializer.VALUE_DEFAULT_TYPE, MessageTo.class,
+            JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName()),
+        false );
     return deserializer;
   }
 
@@ -259,9 +240,6 @@ public class KafkaServicesConfiguration implements ApplicationRunner
   Properties defaultProducerProperties(ChatBackendProperties chatBackendProperties)
   {
     Properties properties = new Properties();
-    properties.setProperty(
-        ProducerConfig.CLIENT_ID_CONFIG,
-        chatBackendProperties.getKafka().getClientId());
     properties.setProperty(
         ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
         chatBackendProperties.getKafka().getBootstrapServers());
@@ -275,9 +253,6 @@ public class KafkaServicesConfiguration implements ApplicationRunner
     properties.setProperty(
         ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
         chatBackendProperties.getKafka().getBootstrapServers());
-    properties.setProperty(
-        ConsumerConfig.CLIENT_ID_CONFIG,
-        chatBackendProperties.getKafka().getClientId());
     properties.setProperty(
         ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
         "false");