From 2a47c081a50f8ebdda6db9955c3ddc69e5c601c2 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Wed, 19 Apr 2023 21:54:25 +0200 Subject: [PATCH] NEU --- .../persistence/kafka/ChatMessageChannel.java | 13 +-- .../persistence/kafka/ChatRoomChannel.java | 2 +- ...hatHomeService.java => KafkaChatHome.java} | 26 +++--- .../kafka/LoadInProgressException.java | 17 ++++ .../kafka/KafkaServicesConfiguration.java | 84 ++++++++++--------- 5 files changed, 81 insertions(+), 61 deletions(-) rename src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/{KafkaChatHomeService.java => KafkaChatHome.java} (53%) create mode 100644 src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/LoadInProgressException.java diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java index bff38ae2..69947a9d 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java @@ -12,7 +12,6 @@ import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.RecordDeserializationException; import org.apache.kafka.common.errors.WakeupException; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -29,8 +28,8 @@ import java.util.stream.IntStream; public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener { private final String topic; - private final Consumer consumer; private final Producer producer; + private final Consumer consumer; private final ZoneId zoneId; private final int numShards; private final boolean[] isShardOwned; @@ -46,8 +45,8 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener public ChatMessageChannel( String topic, - Consumer consumer, Producer producer, + Consumer consumer, ZoneId zoneId, int numShards) { @@ -270,8 +269,12 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener return Mono.justOrEmpty(chatrooms[shard].get(id)); } - Flux getChatRooms(int shard) + Flux getChatRooms() { - return Flux.fromStream(chatrooms[shard].values().stream()); + return Flux.fromStream(IntStream + .range(0, numShards) + .filter(shard -> isShardOwned[shard]) + .mapToObj(shard -> Integer.valueOf(shard)) + .flatMap(shard -> chatrooms[shard].values().stream())); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java index ac44f997..f9568e70 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java @@ -23,8 +23,8 @@ import java.util.UUID; public class ChatRoomChannel implements Runnable { private final String topic; - private final Consumer consumer; private final Producer producer; + private final Consumer consumer; private final ShardingStrategy shardingStrategy; private final ChatMessageChannel chatMessageChannel; private final Clock clock; diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java similarity index 53% rename from src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java rename to src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java index 38aecd18..88947a04 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java @@ -1,8 +1,10 @@ package de.juplo.kafka.chat.backend.persistence.kafka; -import de.juplo.kafka.chat.backend.domain.ChatHomeService; +import de.juplo.kafka.chat.backend.domain.ChatHome; import de.juplo.kafka.chat.backend.domain.ChatRoom; import de.juplo.kafka.chat.backend.domain.ShardNotOwnedException; +import de.juplo.kafka.chat.backend.domain.ShardingStrategy; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -10,25 +12,21 @@ import reactor.core.publisher.Mono; import java.util.*; +@RequiredArgsConstructor @Slf4j -public class KafkaChatHomeService implements ChatHomeService +public class KafkaChatHome implements ChatHome { + private final ShardingStrategy shardingStrategy; private final ChatMessageChannel chatMessageChanel; - public KafkaChatHomeService(ChatMessageChannel chatMessageChannel) - { - log.debug("Creating KafkaChatHomeService"); - this.chatMessageChanel = chatMessageChannel; - } - - @Override - public Mono getChatRoom(int shard, UUID id) + public Mono getChatRoom(UUID id) { + int shard = shardingStrategy.selectShard(id); if (chatMessageChanel.isLoadInProgress()) { - throw new ShardNotOwnedException(shard); + throw new LoadInProgressException(shard); } else { @@ -37,15 +35,15 @@ public class KafkaChatHomeService implements ChatHomeService } @Override - public Flux getChatRooms(int shard) + public Flux getChatRooms() { if (chatMessageChanel.isLoadInProgress()) { - throw new ShardNotOwnedException(shard); + throw new LoadInProgressException(); } else { - return chatMessageChanel.getChatRooms(shard); + return chatMessageChanel.getChatRooms(); } } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/LoadInProgressException.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/LoadInProgressException.java new file mode 100644 index 00000000..83e06bd1 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/LoadInProgressException.java @@ -0,0 +1,17 @@ +package de.juplo.kafka.chat.backend.persistence.kafka; + +import de.juplo.kafka.chat.backend.domain.ShardNotOwnedException; + + +public class LoadInProgressException extends ShardNotOwnedException +{ + public LoadInProgressException() + { + this(-1); + } + + public LoadInProgressException(int shard) + { + super(shard); + } +} diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java index 91115ea5..55aa6f84 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java @@ -7,15 +7,17 @@ import de.juplo.kafka.chat.backend.domain.ShardedChatHome; import de.juplo.kafka.chat.backend.domain.ShardingStrategy; import de.juplo.kafka.chat.backend.domain.SimpleChatHome; import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy; -import de.juplo.kafka.chat.backend.persistence.StorageStrategy; import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatHomeService; import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomFactory; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.producer.Producer; import org.springframework.boot.ApplicationRunner; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.time.Clock; +import java.time.ZoneId; @ConditionalOnProperty( @@ -27,61 +29,61 @@ public class KafkaServicesConfiguration implements ApplicationRunner { @Bean ChatHome kafkaChatHome( - ChatBackendProperties properties, - KafkaChatHomeService chatHomeService) + ShardingStrategy shardingStrategy, + ChatMessageChannel chatMessageChannel) { - int numShards = properties.getInmemory().getNumShards(); - SimpleChatHome[] chatHomes = new SimpleChatHome[numShards]; - for (int shard = 0; shard < numShards; shard++) - { - - } - .read() - .subscribe(chatRoom -> - { - int shard = chatRoom.getShard(); - if (chatHomes[shard] == null) - chatHomes[shard] = new SimpleChatHome(chatHomeService, shard); - }); - ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards); - return new ShardedChatHome(chatHomes, strategy); + return new KafkaChatHome(shardingStrategy, chatMessageChannel); } @Bean - KafkaChatHomeService kafkaChatHomeService(ChatBackendProperties properties) + KafkaChatRoomFactory chatRoomFactory(ChatRoomChannel chatRoomChannel) { - ShardingStrategyType sharding = - properties.getInmemory().getShardingStrategy(); - int numShards = sharding == ShardingStrategyType.none - ? 1 - : properties.getInmemory().getNumShards(); - int[] ownedShards = sharding == ShardingStrategyType.none - ? new int[] { 0 } - : properties.getInmemory().getOwnedShards(); - return new InMemoryChatHomeService( - numShards, - ownedShards, - storageStrategy.read()); + return new KafkaChatRoomFactory(chatRoomChannel); } @Bean - InMemoryChatRoomFactory chatRoomFactory( - InMemoryChatHomeService service, - ShardingStrategy strategy, - Clock clock, - ChatBackendProperties properties) + ChatRoomChannel chatRoomChannel( + ChatBackendProperties properties, + Producer chatRoomChannelProducer, + Consumer chatRoomChannelConsumer, + ShardingStrategy shardingStrategy, + ChatMessageChannel chatMessageChannel, + Clock clock) { - return new InMemoryChatRoomFactory( - service, - strategy, + return new ChatRoomChannel( + properties.getKafka().getTopic(), + chatRoomChannelProducer, + chatRoomChannelConsumer, + shardingStrategy, + chatMessageChannel, clock, properties.getChatroomBufferSize()); } @Bean - ShardingStrategy kafkalikeShardingStrategy(ChatBackendProperties properties) + ChatMessageChannel chatMessageChannel( + ChatBackendProperties properties, + Producer chatMessageChannelProducer, + Consumer chatMessageChannelConsumer, + ZoneId zoneId) { - return new KafkaLikeShardingStrategy( + return new ChatMessageChannel( + properties.getKafka().getTopic(), + chatMessageChannelProducer, + chatMessageChannelConsumer, + zoneId, properties.getKafka().getNumPartitions()); } + + @Bean + ShardingStrategy shardingStrategy(ChatBackendProperties properties) + { + return new KafkaLikeShardingStrategy(properties.getKafka().getNumPartitions()); + } + + @Bean + ZoneId zoneId() + { + return ZoneId.systemDefault(); + } } -- 2.20.1