]> juplo.de Git - demos/kafka/chat/commitdiff
WIP
authorKai Moritz <kai@juplo.de>
Sun, 20 Aug 2023 08:09:18 +0000 (10:09 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 20 Aug 2023 08:48:27 +0000 (10:48 +0200)
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java

index 1308946ef9cd8c5e0311d04fd41e4d1659822f42..275224d4524a57f61196d784873a3ad67b15d9f2 100644 (file)
@@ -51,7 +51,7 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
     Clock clock)
   {
     log.debug(
-        "Creating ChatMessageChannel for topic {} with {} partitions",
+        "Creating ChatRoomChannel for topic {} with {} partitions",
         topic,
         numShards);
     this.topic = topic;
index fac358251acff174efb7c476e41b13d1ebcd6d02..fec48b0c4e64de77ca97dff50f9158e37042ce60 100644 (file)
@@ -30,31 +30,31 @@ public class KafkaServicesApplicationRunner implements ApplicationRunner
   @Autowired
   ChatRoomChannel chatRoomChannel;
   @Autowired
-  Consumer<String, AbstractMessageTo> chatMessageChannelConsumer;
+  Consumer<String, AbstractMessageTo> chatRoomChannelConsumer;
 
-  CompletableFuture<Void> chatMessageChannelConsumerJob;
+  CompletableFuture<Void> chatRoomChannelConsumerJob;
 
 
   @Override
   public void run(ApplicationArguments args) throws Exception
   {
-    log.info("Starting the consumer for the ChatMessageChannel");
-    chatMessageChannelConsumerJob = taskExecutor
+    log.info("Starting the consumer for the ChatRoomChannel");
+    chatRoomChannelConsumerJob = taskExecutor
         .submitCompletable(chatRoomChannel)
         .exceptionally(e ->
         {
-          log.error("The consumer for the ChatMessageChannel exited abnormally!", e);
+          log.error("The consumer for the ChatRoomChannel exited abnormally!", e);
           return null;
         });
   }
 
   @PreDestroy
-  public void joinChatMessageChannelConsumerJob()
+  public void joinChatRoomChannelConsumerJob()
   {
     log.info("Signaling the consumer of the CahtRoomChannel to quit its work");
-    chatMessageChannelConsumer.wakeup();
-    log.info("Waiting for the consumer of the ChatMessageChannel to finish its work");
-    chatMessageChannelConsumerJob.join();
-    log.info("Joined the consumer of the ChatMessageChannel");
+    chatRoomChannelConsumer.wakeup();
+    log.info("Waiting for the consumer of the ChatRoomChannel to finish its work");
+    chatRoomChannelConsumerJob.join();
+    log.info("Joined the consumer of the ChatRoomChannel");
   }
 }
index 724739bf2c2155c91d0b1a5ee799d4451b48ce43..e8c3f0d84ecbedb1aa1ad1291c4d843f3ad22b2a 100644 (file)
@@ -52,17 +52,17 @@ public class KafkaServicesConfiguration
   }
 
   @Bean
-  ChatRoomChannel chatMessageChannel(
+  ChatRoomChannel chatRoomChannel(
       ChatBackendProperties properties,
-      Producer<String, AbstractMessageTo> chatMessageChannelProducer,
-      Consumer<String, AbstractMessageTo> chatMessageChannelConsumer,
+      Producer<String, AbstractMessageTo> chatRoomChannelProducer,
+      Consumer<String, AbstractMessageTo> chatRoomChannelConsumer,
       ZoneId zoneId,
       Clock clock)
   {
     return new ChatRoomChannel(
         properties.getKafka().getMessageChannelTopic(),
-        chatMessageChannelProducer,
-        chatMessageChannelConsumer,
+        chatRoomChannelProducer,
+        chatRoomChannelConsumer,
         zoneId,
         properties.getKafka().getNumPartitions(),
         properties.getChatroomBufferSize(),
@@ -70,7 +70,7 @@ public class KafkaServicesConfiguration
   }
 
   @Bean
-  Producer<String, AbstractMessageTo>  chatMessageChannelProducer(
+  Producer<String, AbstractMessageTo>  chatRoomChannelProducer(
       Properties defaultProducerProperties,
       ChatBackendProperties chatBackendProperties,
       StringSerializer stringSerializer,
@@ -105,7 +105,7 @@ public class KafkaServicesConfiguration
   }
 
   @Bean
-  Consumer<String, AbstractMessageTo>  chatMessageChannelConsumer(
+  Consumer<String, AbstractMessageTo>  chatRoomChannelConsumer(
       Properties defaultConsumerProperties,
       ChatBackendProperties chatBackendProperties,
       StringDeserializer stringDeserializer,