+++ /dev/null
-package de.juplo.kafka.chat.backend.domain;
-
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
-import java.util.UUID;
-
-
-public interface ChatHomeService
-{
- Mono<ChatRoom> getChatRoom(int shard, UUID id);
- Flux<ChatRoom> getChatRooms(int shard);
-}
+++ /dev/null
-package de.juplo.kafka.chat.backend.domain;
-
-import lombok.extern.slf4j.Slf4j;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
-import java.util.HashSet;
-import java.util.Set;
-import java.util.UUID;
-import java.util.stream.Collectors;
-
-
-@Slf4j
-public class ShardedChatHome implements ChatHome
-{
- private final ChatHome[] chatHomes;
- private final Set<Integer> ownedShards;
- private final ShardingStrategy shardingStrategy;
-
-
- public ShardedChatHome(
- ChatHome[] chatHomes,
- ShardingStrategy shardingStrategy)
- {
- this.chatHomes = chatHomes;
- this.shardingStrategy = shardingStrategy;
- this.ownedShards = new HashSet<>();
- for (int shard = 0; shard < chatHomes.length; shard++)
- if(chatHomes[shard] != null)
- this.ownedShards.add(shard);
- log.info(
- "Created ShardedChatHome for shards: {}",
- ownedShards
- .stream()
- .map(String::valueOf)
- .collect(Collectors.joining(", ")));
- }
-
-
- @Override
- public Mono<ChatRoom> getChatRoom(UUID id)
- {
- int shard = selectShard(id);
- if (chatHomes[shard] == null)
- throw new ShardNotOwnedException(shard);
- return chatHomes[shard].getChatRoom(id);
- }
-
- @Override
- public Flux<ChatRoom> getChatRooms()
- {
- return Flux
- .fromIterable(ownedShards)
- .flatMap(shard -> chatHomes[shard].getChatRooms());
- }
-
-
- private int selectShard(UUID chatroomId)
- {
- return shardingStrategy.selectShard(chatroomId);
- }
-}
+++ /dev/null
-package de.juplo.kafka.chat.backend.domain;
-
-import java.util.UUID;
-
-
-public interface ShardingStrategy
-{
- int selectShard(UUID chatRoomId);
-}
+++ /dev/null
-package de.juplo.kafka.chat.backend.domain;
-
-import lombok.extern.slf4j.Slf4j;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
-import java.util.*;
-
-
-@Slf4j
-public class SimpleChatHome implements ChatHome
-{
- private final ChatHomeService service;
- private final int shard;
-
-
- public SimpleChatHome(ChatHomeService service, int shard)
- {
- log.info("Created SimpleChatHome for shard {}", shard);
- this.service = service;
- this.shard = shard;
- }
-
- public SimpleChatHome(ChatHomeService service)
- {
- this(service, 0);
- }
-
-
- @Override
- public Mono<ChatRoom> getChatRoom(UUID id)
- {
- return service
- .getChatRoom(shard, id)
- .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
- }
-
- @Override
- public Flux<ChatRoom> getChatRooms()
- {
- return service.getChatRooms(shard);
- }
-}
+++ /dev/null
-package de.juplo.kafka.chat.backend.persistence;
-
-import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
-import lombok.RequiredArgsConstructor;
-import org.apache.kafka.common.utils.Utils;
-
-import java.util.UUID;
-
-
-@RequiredArgsConstructor
-public class KafkaLikeShardingStrategy implements ShardingStrategy
-{
- private final int numPartitions;
-
- @Override
- public int selectShard(UUID chatRoomId)
- {
- byte[] serializedKey = chatRoomId.toString().getBytes();
- return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;
- }
-}
--- /dev/null
+package de.juplo.kafka.chat.backend.domain;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.util.UUID;
+
+
+public interface ChatHomeService
+{
+ Mono<ChatRoom> getChatRoom(int shard, UUID id);
+ Flux<ChatRoom> getChatRooms(int shard);
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence;
+
+import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
+import lombok.RequiredArgsConstructor;
+import org.apache.kafka.common.utils.Utils;
+
+import java.util.UUID;
+
+
+@RequiredArgsConstructor
+public class KafkaLikeShardingStrategy implements ShardingStrategy
+{
+ private final int numPartitions;
+
+ @Override
+ public int selectShard(UUID chatRoomId)
+ {
+ byte[] serializedKey = chatRoomId.toString().getBytes();
+ return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.domain;
+
+import lombok.extern.slf4j.Slf4j;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+
+@Slf4j
+public class ShardedChatHome implements ChatHome
+{
+ private final ChatHome[] chatHomes;
+ private final Set<Integer> ownedShards;
+ private final ShardingStrategy shardingStrategy;
+
+
+ public ShardedChatHome(
+ ChatHome[] chatHomes,
+ ShardingStrategy shardingStrategy)
+ {
+ this.chatHomes = chatHomes;
+ this.shardingStrategy = shardingStrategy;
+ this.ownedShards = new HashSet<>();
+ for (int shard = 0; shard < chatHomes.length; shard++)
+ if(chatHomes[shard] != null)
+ this.ownedShards.add(shard);
+ log.info(
+ "Created ShardedChatHome for shards: {}",
+ ownedShards
+ .stream()
+ .map(String::valueOf)
+ .collect(Collectors.joining(", ")));
+ }
+
+
+ @Override
+ public Mono<ChatRoom> getChatRoom(UUID id)
+ {
+ int shard = selectShard(id);
+ if (chatHomes[shard] == null)
+ throw new ShardNotOwnedException(shard);
+ return chatHomes[shard].getChatRoom(id);
+ }
+
+ @Override
+ public Flux<ChatRoom> getChatRooms()
+ {
+ return Flux
+ .fromIterable(ownedShards)
+ .flatMap(shard -> chatHomes[shard].getChatRooms());
+ }
+
+
+ private int selectShard(UUID chatroomId)
+ {
+ return shardingStrategy.selectShard(chatroomId);
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.domain;
+
+import java.util.UUID;
+
+
+public interface ShardingStrategy
+{
+ int selectShard(UUID chatRoomId);
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.domain;
+
+import lombok.extern.slf4j.Slf4j;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.util.*;
+
+
+@Slf4j
+public class SimpleChatHome implements ChatHome
+{
+ private final ChatHomeService service;
+ private final int shard;
+
+
+ public SimpleChatHome(ChatHomeService service, int shard)
+ {
+ log.info("Created SimpleChatHome for shard {}", shard);
+ this.service = service;
+ this.shard = shard;
+ }
+
+ public SimpleChatHome(ChatHomeService service)
+ {
+ this(service, 0);
+ }
+
+
+ @Override
+ public Mono<ChatRoom> getChatRoom(UUID id)
+ {
+ return service
+ .getChatRoom(shard, id)
+ .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
+ }
+
+ @Override
+ public Flux<ChatRoom> getChatRooms()
+ {
+ return service.getChatRooms(shard);
+ }
+}
+++ /dev/null
-package de.juplo.kafka.chat.backend.domain;
-
-import org.junit.jupiter.api.DisplayName;
-import org.junit.jupiter.api.Test;
-import reactor.core.publisher.Mono;
-
-import java.time.Clock;
-import java.util.UUID;
-
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyInt;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import static pl.rzrz.assertj.reactor.Assertions.assertThat;
-
-
-public class SimpleChatHomeTest
-{
- @Test
- @DisplayName("Assert chatroom is delivered, if it exists")
- void testGetExistingChatroom()
- {
- // Given
- ChatHomeService chatHomeService = mock(ChatHomeService.class);
- ChatRoom chatRoom = new ChatRoom(
- UUID.randomUUID(),
- "Foo",
- 0,
- Clock.systemDefaultZone(),
- mock(ChatRoomService.class),
- 8);
- when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.just(chatRoom));
- SimpleChatHome chatHome = new SimpleChatHome(chatHomeService);
-
- // When
- Mono<ChatRoom> mono = chatHome.getChatRoom(chatRoom.getId());
-
- // Then
- assertThat(mono).emitsExactly(chatRoom);
- }
-
- @Test
- @DisplayName("Assert UnknownChatroomException is thrown, if chatroom does not exist")
- void testGetNonExistentChatroom()
- {
- // Given
- ChatHomeService chatHomeService = mock(ChatHomeService.class);
- when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.empty());
- SimpleChatHome chatHome = new SimpleChatHome(chatHomeService);
-
- // When
- Mono<ChatRoom> mono = chatHome.getChatRoom(UUID.randomUUID());
-
- // Then
- assertThat(mono).sendsError();
- }
-}
--- /dev/null
+package de.juplo.kafka.chat.backend.domain;
+
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import reactor.core.publisher.Mono;
+
+import java.time.Clock;
+import java.util.UUID;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static pl.rzrz.assertj.reactor.Assertions.assertThat;
+
+
+public class SimpleChatHomeTest
+{
+ @Test
+ @DisplayName("Assert chatroom is delivered, if it exists")
+ void testGetExistingChatroom()
+ {
+ // Given
+ ChatHomeService chatHomeService = mock(ChatHomeService.class);
+ ChatRoom chatRoom = new ChatRoom(
+ UUID.randomUUID(),
+ "Foo",
+ 0,
+ Clock.systemDefaultZone(),
+ mock(ChatRoomService.class),
+ 8);
+ when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.just(chatRoom));
+ SimpleChatHome chatHome = new SimpleChatHome(chatHomeService);
+
+ // When
+ Mono<ChatRoom> mono = chatHome.getChatRoom(chatRoom.getId());
+
+ // Then
+ assertThat(mono).emitsExactly(chatRoom);
+ }
+
+ @Test
+ @DisplayName("Assert UnknownChatroomException is thrown, if chatroom does not exist")
+ void testGetNonExistentChatroom()
+ {
+ // Given
+ ChatHomeService chatHomeService = mock(ChatHomeService.class);
+ when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.empty());
+ SimpleChatHome chatHome = new SimpleChatHome(chatHomeService);
+
+ // When
+ Mono<ChatRoom> mono = chatHome.getChatRoom(UUID.randomUUID());
+
+ // Then
+ assertThat(mono).sendsError();
+ }
+}