From: Kai Moritz Date: Sun, 20 Aug 2023 09:39:41 +0000 (+0200) Subject: WIP X-Git-Tag: rebase--2023-08-20 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=3a4811f4b5b7079f2dad63dcca99a8966516fcd4;p=demos%2Fkafka%2Fchat WIP --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java index 0b35f8ce..7f93ad8b 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java @@ -3,6 +3,7 @@ package de.juplo.kafka.chat.backend.persistence.kafka; import de.juplo.kafka.chat.backend.domain.ChatRoom; import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; import de.juplo.kafka.chat.backend.domain.Message; +import de.juplo.kafka.chat.backend.domain.ShardNotOwnedException; import de.juplo.kafka.chat.backend.persistence.kafka.messages.AbstractMessageTo; import de.juplo.kafka.chat.backend.persistence.kafka.messages.CommandCreateChatRoomTo; import de.juplo.kafka.chat.backend.persistence.kafka.messages.EventChatMessageReceivedTo; @@ -238,7 +239,7 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener log.info("Exiting normally"); } - void loadChatRoom(ConsumerRecords records) + private void loadChatRoom(ConsumerRecords records) { for (ConsumerRecord record : records) { @@ -276,7 +277,7 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener } } - void createChatRoom( + private void createChatRoom( UUID chatRoomId, CommandCreateChatRoomTo createChatRoomRequestTo, int partition) @@ -294,7 +295,7 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener } - void createChatRoom(ChatRoomInfo chatRoomInfo) + private void createChatRoom(ChatRoomInfo chatRoomInfo) { UUID id = chatRoomInfo.getId(); String name = chatRoomInfo.getName(); @@ -305,7 +306,7 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener putChatRoom(chatRoom); } - void loadChatMessage( + private void loadChatMessage( UUID chatRoomId, LocalDateTime timestamp, long offset, @@ -322,7 +323,7 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener kafkaChatRoomService.persistMessage(message); } - boolean isLoadingCompleted() + private boolean isLoadingCompleted() { return IntStream .range(0, numShards) @@ -330,7 +331,7 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener .allMatch(shard -> nextOffset[shard] >= currentOffset[shard]); } - void pauseAllOwnedPartions() + private void pauseAllOwnedPartions() { consumer.pause(IntStream .range(0, numShards) @@ -361,6 +362,16 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener Mono getChatRoom(int shard, UUID id) { + if (loadInProgress) + { + throw new LoadInProgressException(shard); + } + + if (!isShardOwned[shard]) + { + throw new ShardNotOwnedException(shard); + } + return Mono.justOrEmpty(chatrooms[shard].get(id)); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java index 324e80bf..745abca6 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java @@ -23,14 +23,7 @@ public class KafkaChatHome implements ChatHome public Mono getChatRoom(UUID id) { int shard = shardingStrategy.selectShard(id); - if (chatRoomChannel.isLoadInProgress()) - { - throw new LoadInProgressException(shard); - } - else - { - return chatRoomChannel.getChatRoom(shard, id); - } + return chatRoomChannel.getChatRoom(shard, id); } @Override