WIP
authorKai Moritz <kai@juplo.de>
Sun, 20 Aug 2023 08:09:18 +0000 (10:09 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 20 Aug 2023 08:48:27 +0000 (10:48 +0200)
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java

index 1308946..275224d 100644 (file)
@@ -51,7 +51,7 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
     Clock clock)
   {
     log.debug(
-        "Creating ChatMessageChannel for topic {} with {} partitions",
+        "Creating ChatRoomChannel for topic {} with {} partitions",
         topic,
         numShards);
     this.topic = topic;
index fac3582..fec48b0 100644 (file)
@@ -30,31 +30,31 @@ public class KafkaServicesApplicationRunner implements ApplicationRunner
   @Autowired
   ChatRoomChannel chatRoomChannel;
   @Autowired
-  Consumer<String, AbstractMessageTo> chatMessageChannelConsumer;
+  Consumer<String, AbstractMessageTo> chatRoomChannelConsumer;
 
-  CompletableFuture<Void> chatMessageChannelConsumerJob;
+  CompletableFuture<Void> chatRoomChannelConsumerJob;
 
 
   @Override
   public void run(ApplicationArguments args) throws Exception
   {
-    log.info("Starting the consumer for the ChatMessageChannel");
-    chatMessageChannelConsumerJob = taskExecutor
+    log.info("Starting the consumer for the ChatRoomChannel");
+    chatRoomChannelConsumerJob = taskExecutor
         .submitCompletable(chatRoomChannel)
         .exceptionally(e ->
         {
-          log.error("The consumer for the ChatMessageChannel exited abnormally!", e);
+          log.error("The consumer for the ChatRoomChannel exited abnormally!", e);
           return null;
         });
   }
 
   @PreDestroy
-  public void joinChatMessageChannelConsumerJob()
+  public void joinChatRoomChannelConsumerJob()
   {
     log.info("Signaling the consumer of the CahtRoomChannel to quit its work");
-    chatMessageChannelConsumer.wakeup();
-    log.info("Waiting for the consumer of the ChatMessageChannel to finish its work");
-    chatMessageChannelConsumerJob.join();
-    log.info("Joined the consumer of the ChatMessageChannel");
+    chatRoomChannelConsumer.wakeup();
+    log.info("Waiting for the consumer of the ChatRoomChannel to finish its work");
+    chatRoomChannelConsumerJob.join();
+    log.info("Joined the consumer of the ChatRoomChannel");
   }
 }
index 724739b..e8c3f0d 100644 (file)
@@ -52,17 +52,17 @@ public class KafkaServicesConfiguration
   }
 
   @Bean
-  ChatRoomChannel chatMessageChannel(
+  ChatRoomChannel chatRoomChannel(
       ChatBackendProperties properties,
-      Producer<String, AbstractMessageTo> chatMessageChannelProducer,
-      Consumer<String, AbstractMessageTo> chatMessageChannelConsumer,
+      Producer<String, AbstractMessageTo> chatRoomChannelProducer,
+      Consumer<String, AbstractMessageTo> chatRoomChannelConsumer,
       ZoneId zoneId,
       Clock clock)
   {
     return new ChatRoomChannel(
         properties.getKafka().getMessageChannelTopic(),
-        chatMessageChannelProducer,
-        chatMessageChannelConsumer,
+        chatRoomChannelProducer,
+        chatRoomChannelConsumer,
         zoneId,
         properties.getKafka().getNumPartitions(),
         properties.getChatroomBufferSize(),
@@ -70,7 +70,7 @@ public class KafkaServicesConfiguration
   }
 
   @Bean
-  Producer<String, AbstractMessageTo>  chatMessageChannelProducer(
+  Producer<String, AbstractMessageTo>  chatRoomChannelProducer(
       Properties defaultProducerProperties,
       ChatBackendProperties chatBackendProperties,
       StringSerializer stringSerializer,
@@ -105,7 +105,7 @@ public class KafkaServicesConfiguration
   }
 
   @Bean
-  Consumer<String, AbstractMessageTo>  chatMessageChannelConsumer(
+  Consumer<String, AbstractMessageTo>  chatRoomChannelConsumer(
       Properties defaultConsumerProperties,
       ChatBackendProperties chatBackendProperties,
       StringDeserializer stringDeserializer,