From: Kai Moritz Date: Sun, 20 Aug 2023 08:09:18 +0000 (+0200) Subject: WIP X-Git-Tag: rebase--2023-08-20~12 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=abebe3533a4fe69f0fa76c8c3a070b2c53ffeda7;p=demos%2Fkafka%2Fchat WIP --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java index 1308946e..275224d4 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java @@ -51,7 +51,7 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener Clock clock) { log.debug( - "Creating ChatMessageChannel for topic {} with {} partitions", + "Creating ChatRoomChannel for topic {} with {} partitions", topic, numShards); this.topic = topic; diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java index fac35825..fec48b0c 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java @@ -30,31 +30,31 @@ public class KafkaServicesApplicationRunner implements ApplicationRunner @Autowired ChatRoomChannel chatRoomChannel; @Autowired - Consumer chatMessageChannelConsumer; + Consumer chatRoomChannelConsumer; - CompletableFuture chatMessageChannelConsumerJob; + CompletableFuture chatRoomChannelConsumerJob; @Override public void run(ApplicationArguments args) throws Exception { - log.info("Starting the consumer for the ChatMessageChannel"); - chatMessageChannelConsumerJob = taskExecutor + log.info("Starting the consumer for the ChatRoomChannel"); + chatRoomChannelConsumerJob = taskExecutor .submitCompletable(chatRoomChannel) .exceptionally(e -> { - log.error("The consumer for the ChatMessageChannel exited abnormally!", e); + log.error("The consumer for the ChatRoomChannel exited abnormally!", e); return null; }); } @PreDestroy - public void joinChatMessageChannelConsumerJob() + public void joinChatRoomChannelConsumerJob() { log.info("Signaling the consumer of the CahtRoomChannel to quit its work"); - chatMessageChannelConsumer.wakeup(); - log.info("Waiting for the consumer of the ChatMessageChannel to finish its work"); - chatMessageChannelConsumerJob.join(); - log.info("Joined the consumer of the ChatMessageChannel"); + chatRoomChannelConsumer.wakeup(); + log.info("Waiting for the consumer of the ChatRoomChannel to finish its work"); + chatRoomChannelConsumerJob.join(); + log.info("Joined the consumer of the ChatRoomChannel"); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java index 724739bf..e8c3f0d8 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java @@ -52,17 +52,17 @@ public class KafkaServicesConfiguration } @Bean - ChatRoomChannel chatMessageChannel( + ChatRoomChannel chatRoomChannel( ChatBackendProperties properties, - Producer chatMessageChannelProducer, - Consumer chatMessageChannelConsumer, + Producer chatRoomChannelProducer, + Consumer chatRoomChannelConsumer, ZoneId zoneId, Clock clock) { return new ChatRoomChannel( properties.getKafka().getMessageChannelTopic(), - chatMessageChannelProducer, - chatMessageChannelConsumer, + chatRoomChannelProducer, + chatRoomChannelConsumer, zoneId, properties.getKafka().getNumPartitions(), properties.getChatroomBufferSize(), @@ -70,7 +70,7 @@ public class KafkaServicesConfiguration } @Bean - Producer chatMessageChannelProducer( + Producer chatRoomChannelProducer( Properties defaultProducerProperties, ChatBackendProperties chatBackendProperties, StringSerializer stringSerializer, @@ -105,7 +105,7 @@ public class KafkaServicesConfiguration } @Bean - Consumer chatMessageChannelConsumer( + Consumer chatRoomChannelConsumer( Properties defaultConsumerProperties, ChatBackendProperties chatBackendProperties, StringDeserializer stringDeserializer,