From: Kai Moritz Date: Sun, 20 Aug 2023 10:57:26 +0000 (+0200) Subject: refactor: Moved implementation details out of `domain` -- Moved classes X-Git-Tag: rebase--2023-08-22~5 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=f1dfc516a5822481540d70748c987b5c19e165e9;p=demos%2Fkafka%2Fchat refactor: Moved implementation details out of `domain` -- Moved classes --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHomeService.java deleted file mode 100644 index 19ff4aa4..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHomeService.java +++ /dev/null @@ -1,13 +0,0 @@ -package de.juplo.kafka.chat.backend.domain; - -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - -import java.util.UUID; - - -public interface ChatHomeService -{ - Mono getChatRoom(int shard, UUID id); - Flux getChatRooms(int shard); -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ShardedChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ShardedChatHome.java deleted file mode 100644 index 6d2f0794..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ShardedChatHome.java +++ /dev/null @@ -1,62 +0,0 @@ -package de.juplo.kafka.chat.backend.domain; - -import lombok.extern.slf4j.Slf4j; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - -import java.util.HashSet; -import java.util.Set; -import java.util.UUID; -import java.util.stream.Collectors; - - -@Slf4j -public class ShardedChatHome implements ChatHome -{ - private final ChatHome[] chatHomes; - private final Set ownedShards; - private final ShardingStrategy shardingStrategy; - - - public ShardedChatHome( - ChatHome[] chatHomes, - ShardingStrategy shardingStrategy) - { - this.chatHomes = chatHomes; - this.shardingStrategy = shardingStrategy; - this.ownedShards = new HashSet<>(); - for (int shard = 0; shard < chatHomes.length; shard++) - if(chatHomes[shard] != null) - this.ownedShards.add(shard); - log.info( - "Created ShardedChatHome for shards: {}", - ownedShards - .stream() - .map(String::valueOf) - .collect(Collectors.joining(", "))); - } - - - @Override - public Mono getChatRoom(UUID id) - { - int shard = selectShard(id); - if (chatHomes[shard] == null) - throw new ShardNotOwnedException(shard); - return chatHomes[shard].getChatRoom(id); - } - - @Override - public Flux getChatRooms() - { - return Flux - .fromIterable(ownedShards) - .flatMap(shard -> chatHomes[shard].getChatRooms()); - } - - - private int selectShard(UUID chatroomId) - { - return shardingStrategy.selectShard(chatroomId); - } -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ShardingStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ShardingStrategy.java deleted file mode 100644 index dde0e543..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ShardingStrategy.java +++ /dev/null @@ -1,9 +0,0 @@ -package de.juplo.kafka.chat.backend.domain; - -import java.util.UUID; - - -public interface ShardingStrategy -{ - int selectShard(UUID chatRoomId); -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/SimpleChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/domain/SimpleChatHome.java deleted file mode 100644 index 11542edd..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/SimpleChatHome.java +++ /dev/null @@ -1,43 +0,0 @@ -package de.juplo.kafka.chat.backend.domain; - -import lombok.extern.slf4j.Slf4j; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - -import java.util.*; - - -@Slf4j -public class SimpleChatHome implements ChatHome -{ - private final ChatHomeService service; - private final int shard; - - - public SimpleChatHome(ChatHomeService service, int shard) - { - log.info("Created SimpleChatHome for shard {}", shard); - this.service = service; - this.shard = shard; - } - - public SimpleChatHome(ChatHomeService service) - { - this(service, 0); - } - - - @Override - public Mono getChatRoom(UUID id) - { - return service - .getChatRoom(shard, id) - .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id))); - } - - @Override - public Flux getChatRooms() - { - return service.getChatRooms(shard); - } -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/KafkaLikeShardingStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/KafkaLikeShardingStrategy.java deleted file mode 100644 index 41fd9cd0..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/KafkaLikeShardingStrategy.java +++ /dev/null @@ -1,21 +0,0 @@ -package de.juplo.kafka.chat.backend.persistence; - -import de.juplo.kafka.chat.backend.domain.ShardingStrategy; -import lombok.RequiredArgsConstructor; -import org.apache.kafka.common.utils.Utils; - -import java.util.UUID; - - -@RequiredArgsConstructor -public class KafkaLikeShardingStrategy implements ShardingStrategy -{ - private final int numPartitions; - - @Override - public int selectShard(UUID chatRoomId) - { - byte[] serializedKey = chatRoomId.toString().getBytes(); - return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions; - } -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/ChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/ChatHomeService.java new file mode 100644 index 00000000..19ff4aa4 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/ChatHomeService.java @@ -0,0 +1,13 @@ +package de.juplo.kafka.chat.backend.domain; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.UUID; + + +public interface ChatHomeService +{ + Mono getChatRoom(int shard, UUID id); + Flux getChatRooms(int shard); +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/KafkaLikeShardingStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/KafkaLikeShardingStrategy.java new file mode 100644 index 00000000..41fd9cd0 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/KafkaLikeShardingStrategy.java @@ -0,0 +1,21 @@ +package de.juplo.kafka.chat.backend.persistence; + +import de.juplo.kafka.chat.backend.domain.ShardingStrategy; +import lombok.RequiredArgsConstructor; +import org.apache.kafka.common.utils.Utils; + +import java.util.UUID; + + +@RequiredArgsConstructor +public class KafkaLikeShardingStrategy implements ShardingStrategy +{ + private final int numPartitions; + + @Override + public int selectShard(UUID chatRoomId) + { + byte[] serializedKey = chatRoomId.toString().getBytes(); + return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions; + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHome.java new file mode 100644 index 00000000..6d2f0794 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHome.java @@ -0,0 +1,62 @@ +package de.juplo.kafka.chat.backend.domain; + +import lombok.extern.slf4j.Slf4j; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.HashSet; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; + + +@Slf4j +public class ShardedChatHome implements ChatHome +{ + private final ChatHome[] chatHomes; + private final Set ownedShards; + private final ShardingStrategy shardingStrategy; + + + public ShardedChatHome( + ChatHome[] chatHomes, + ShardingStrategy shardingStrategy) + { + this.chatHomes = chatHomes; + this.shardingStrategy = shardingStrategy; + this.ownedShards = new HashSet<>(); + for (int shard = 0; shard < chatHomes.length; shard++) + if(chatHomes[shard] != null) + this.ownedShards.add(shard); + log.info( + "Created ShardedChatHome for shards: {}", + ownedShards + .stream() + .map(String::valueOf) + .collect(Collectors.joining(", "))); + } + + + @Override + public Mono getChatRoom(UUID id) + { + int shard = selectShard(id); + if (chatHomes[shard] == null) + throw new ShardNotOwnedException(shard); + return chatHomes[shard].getChatRoom(id); + } + + @Override + public Flux getChatRooms() + { + return Flux + .fromIterable(ownedShards) + .flatMap(shard -> chatHomes[shard].getChatRooms()); + } + + + private int selectShard(UUID chatroomId) + { + return shardingStrategy.selectShard(chatroomId); + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardingStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardingStrategy.java new file mode 100644 index 00000000..dde0e543 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardingStrategy.java @@ -0,0 +1,9 @@ +package de.juplo.kafka.chat.backend.domain; + +import java.util.UUID; + + +public interface ShardingStrategy +{ + int selectShard(UUID chatRoomId); +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHome.java new file mode 100644 index 00000000..11542edd --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHome.java @@ -0,0 +1,43 @@ +package de.juplo.kafka.chat.backend.domain; + +import lombok.extern.slf4j.Slf4j; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.*; + + +@Slf4j +public class SimpleChatHome implements ChatHome +{ + private final ChatHomeService service; + private final int shard; + + + public SimpleChatHome(ChatHomeService service, int shard) + { + log.info("Created SimpleChatHome for shard {}", shard); + this.service = service; + this.shard = shard; + } + + public SimpleChatHome(ChatHomeService service) + { + this(service, 0); + } + + + @Override + public Mono getChatRoom(UUID id) + { + return service + .getChatRoom(shard, id) + .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id))); + } + + @Override + public Flux getChatRooms() + { + return service.getChatRooms(shard); + } +} diff --git a/src/test/java/de/juplo/kafka/chat/backend/domain/SimpleChatHomeTest.java b/src/test/java/de/juplo/kafka/chat/backend/domain/SimpleChatHomeTest.java deleted file mode 100644 index 5b53607f..00000000 --- a/src/test/java/de/juplo/kafka/chat/backend/domain/SimpleChatHomeTest.java +++ /dev/null @@ -1,57 +0,0 @@ -package de.juplo.kafka.chat.backend.domain; - -import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Test; -import reactor.core.publisher.Mono; - -import java.time.Clock; -import java.util.UUID; - -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyInt; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; -import static pl.rzrz.assertj.reactor.Assertions.assertThat; - - -public class SimpleChatHomeTest -{ - @Test - @DisplayName("Assert chatroom is delivered, if it exists") - void testGetExistingChatroom() - { - // Given - ChatHomeService chatHomeService = mock(ChatHomeService.class); - ChatRoom chatRoom = new ChatRoom( - UUID.randomUUID(), - "Foo", - 0, - Clock.systemDefaultZone(), - mock(ChatRoomService.class), - 8); - when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.just(chatRoom)); - SimpleChatHome chatHome = new SimpleChatHome(chatHomeService); - - // When - Mono mono = chatHome.getChatRoom(chatRoom.getId()); - - // Then - assertThat(mono).emitsExactly(chatRoom); - } - - @Test - @DisplayName("Assert UnknownChatroomException is thrown, if chatroom does not exist") - void testGetNonExistentChatroom() - { - // Given - ChatHomeService chatHomeService = mock(ChatHomeService.class); - when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.empty()); - SimpleChatHome chatHome = new SimpleChatHome(chatHomeService); - - // When - Mono mono = chatHome.getChatRoom(UUID.randomUUID()); - - // Then - assertThat(mono).sendsError(); - } -} diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHomeTest.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHomeTest.java new file mode 100644 index 00000000..5b53607f --- /dev/null +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHomeTest.java @@ -0,0 +1,57 @@ +package de.juplo.kafka.chat.backend.domain; + +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Mono; + +import java.time.Clock; +import java.util.UUID; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static pl.rzrz.assertj.reactor.Assertions.assertThat; + + +public class SimpleChatHomeTest +{ + @Test + @DisplayName("Assert chatroom is delivered, if it exists") + void testGetExistingChatroom() + { + // Given + ChatHomeService chatHomeService = mock(ChatHomeService.class); + ChatRoom chatRoom = new ChatRoom( + UUID.randomUUID(), + "Foo", + 0, + Clock.systemDefaultZone(), + mock(ChatRoomService.class), + 8); + when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.just(chatRoom)); + SimpleChatHome chatHome = new SimpleChatHome(chatHomeService); + + // When + Mono mono = chatHome.getChatRoom(chatRoom.getId()); + + // Then + assertThat(mono).emitsExactly(chatRoom); + } + + @Test + @DisplayName("Assert UnknownChatroomException is thrown, if chatroom does not exist") + void testGetNonExistentChatroom() + { + // Given + ChatHomeService chatHomeService = mock(ChatHomeService.class); + when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.empty()); + SimpleChatHome chatHome = new SimpleChatHome(chatHomeService); + + // When + Mono mono = chatHome.getChatRoom(UUID.randomUUID()); + + // Then + assertThat(mono).sendsError(); + } +}