@Autowired
ChatBackendProperties properties;
@Autowired
- ChatHome[] chatHomes;
+ ChatHome chatHome;
@Autowired
StorageStrategy storageStrategy;
@PreDestroy
public void onExit()
{
- for (int shard = 0; shard < chatHomes.length; shard++)
- storageStrategy.write(chatHomes[shard].getChatRooms());
+ storageStrategy.write(chatHome.getChatRooms());
}
public static void main(String[] args)
package de.juplo.kafka.chat.backend.domain;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
-import java.util.UUID;
+import java.util.*;
-public interface ChatHome
+@RequiredArgsConstructor
+@Slf4j
+public class ChatHome
{
- Mono<ChatRoom> getChatRoom(UUID id);
+ private final ChatHomeService service;
- Flux<ChatRoom> getChatRooms();
+
+ public Mono<ChatRoom> getChatRoom(UUID id)
+ {
+ return service
+ .getChatRoom(id)
+ .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
+ }
+
+ public Flux<ChatRoom> getChatRooms()
+ {
+ return service.getChatRooms();
+ }
}
public interface ChatHomeService
{
- Mono<ChatRoom> getChatRoom(int shard, UUID id);
- Flux<ChatRoom> getChatRooms(int shard);
+ Mono<ChatRoom> getChatRoom(UUID id);
+ Flux<ChatRoom> getChatRooms();
}
--- /dev/null
+package de.juplo.kafka.chat.backend.domain;
+
+import lombok.Getter;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.stream.Collectors;
+
+
+public class ShardNotOwnedException extends IllegalStateException
+{
+ @Getter
+ private final ChatHomeService chatHomeService;
+ @Getter
+ private final ChatRoomInfo chatRoomInfo;
+ @Getter
+ private final int shard;
+ @Getter
+ private final int[] ownedShards;
+
+
+ public ShardNotOwnedException(
+ ChatHomeService chatHomeService,
+ ChatRoomInfo chatRoomInfo,
+ int shard,
+ Collection<Integer> ownedShards)
+ {
+ this(
+ chatHomeService,
+ chatRoomInfo,
+ shard,
+ ShardNotOwnedException.toArray(ownedShards));
+ }
+
+ public ShardNotOwnedException(
+ ChatHomeService chatHomeService,
+ ChatRoomInfo chatRoomInfo,
+ int shard,
+ int[] ownedShards)
+ {
+ super(
+ chatHomeService +
+ " does not own the shard " +
+ shard +
+ " for ChatRoom " +
+ chatRoomInfo +
+ " owned shards: " +
+ Arrays
+ .stream(ownedShards)
+ .mapToObj(ownedShard -> Integer.toString(ownedShard))
+ .collect(Collectors.joining(", ")));
+ this.chatHomeService = chatHomeService;
+ this.chatRoomInfo = chatRoomInfo;
+ this.shard = shard;
+ this.ownedShards = ownedShards;
+ }
+
+
+ private static int[] toArray(Collection<Integer> collection)
+ {
+ int[] array = new int[collection.size()];
+ Iterator<Integer> iterator = collection.iterator();
+ for (int i = 0; iterator.hasNext(); i++)
+ array[i] = iterator.next();
+ return array;
+ }
+}
+++ /dev/null
-package de.juplo.kafka.chat.backend.domain;
-
-import lombok.extern.slf4j.Slf4j;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
-import java.util.HashSet;
-import java.util.Set;
-import java.util.UUID;
-import java.util.stream.Collectors;
-
-
-@Slf4j
-public class ShardedChatHome implements ChatHome
-{
- private final ChatHome[] chatHomes;
- private final Set<Integer> ownedShards;
- private final ShardingStrategy shardingStrategy;
-
-
- public ShardedChatHome(
- ChatHome[] chatHomes,
- ShardingStrategy shardingStrategy)
- {
- this.chatHomes = chatHomes;
- this.shardingStrategy = shardingStrategy;
- this.ownedShards = new HashSet<>();
- for (int shard = 0; shard < chatHomes.length; shard++)
- if(chatHomes[shard] != null)
- this.ownedShards.add(shard);
- log.info(
- "Created ShardedChatHome for shards: {}",
- ownedShards
- .stream()
- .map(String::valueOf)
- .collect(Collectors.joining(", ")));
- }
-
-
- @Override
- public Mono<ChatRoom> getChatRoom(UUID id)
- {
- return chatHomes[selectShard(id)].getChatRoom(id);
- }
-
- @Override
- public Flux<ChatRoom> getChatRooms()
- {
- return Flux
- .fromIterable(ownedShards)
- .flatMap(shard -> chatHomes[shard].getChatRooms());
- }
-
-
- private int selectShard(UUID chatroomId)
- {
- return shardingStrategy.selectShard(chatroomId);
- }
-}
+++ /dev/null
-package de.juplo.kafka.chat.backend.domain;
-
-import lombok.extern.slf4j.Slf4j;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
-import java.util.*;
-
-
-@Slf4j
-public class SimpleChatHome implements ChatHome
-{
- private final ChatHomeService service;
- private final int shard;
-
-
- public SimpleChatHome(ChatHomeService service, int shard)
- {
- log.info("Created SimpleChatHome for shard {}", shard);
- this.service = service;
- this.shard = shard;
- }
-
- public SimpleChatHome(ChatHomeService service)
- {
- this(service, 0);
- }
-
-
- @Override
- public Mono<ChatRoom> getChatRoom(UUID id)
- {
- return service
- .getChatRoom(shard, id)
- .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
- }
-
- @Override
- public Flux<ChatRoom> getChatRooms()
- {
- return service.getChatRooms(shard);
- }
-}
import lombok.Getter;
+import java.util.Arrays;
+import java.util.Optional;
import java.util.UUID;
+import java.util.stream.Collectors;
-public class UnknownChatroomException extends RuntimeException
+public class UnknownChatroomException extends IllegalStateException
{
@Getter
private final UUID chatroomId;
+ @Getter
+ private final Optional<Integer> shard;
+ @Getter
+ private final Optional<int[]> ownedShards;
public UnknownChatroomException(UUID chatroomId)
{
super("Chatroom does not exist: " + chatroomId);
this.chatroomId = chatroomId;
+ this.shard = Optional.empty();
+ this.ownedShards = Optional.empty();
+ }
+
+ public UnknownChatroomException(UUID chatroomId, int shard, int[] ownedShards)
+ {
+ super(
+ "Chatroom does not exist (here): " +
+ chatroomId +
+ " shard=" +
+ shard +
+ ", owned=" +
+ Arrays
+ .stream(ownedShards)
+ .mapToObj(ownedShard -> Integer.toString(ownedShard))
+ .collect(Collectors.joining(",")));
+ this.chatroomId = chatroomId;
+ this.shard = Optional.of(shard);
+ this.ownedShards = Optional.of(ownedShards);
}
}
package de.juplo.kafka.chat.backend.persistence.inmemory;
-import de.juplo.kafka.chat.backend.domain.ChatRoom;
-import de.juplo.kafka.chat.backend.domain.ChatHomeService;
+import de.juplo.kafka.chat.backend.domain.*;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class InMemoryChatHomeService implements ChatHomeService
{
private final Map<UUID, ChatRoom>[] chatrooms;
+ private final Set<Integer> ownedShards;
+ private final ShardingStrategy shardingStrategy;
public InMemoryChatHomeService(
int numShards,
int[] ownedShards,
+ ShardingStrategy shardingStrategy,
Flux<ChatRoom> chatroomFlux)
{
log.debug("Creating InMemoryChatHomeService");
+
this.chatrooms = new Map[numShards];
- Set<Integer> owned = Arrays
+
+ this.ownedShards = Arrays
.stream(ownedShards)
.collect(
() -> new HashSet<>(),
(set, i) -> set.add(i),
(a, b) -> a.addAll(b));
+
+ this.shardingStrategy = shardingStrategy;
+
for (int shard = 0; shard < numShards; shard++)
{
- chatrooms[shard] = owned.contains(shard)
+ chatrooms[shard] = this.ownedShards.contains(shard)
? new HashMap<>()
: null;
}
chatroomFlux
.filter(chatRoom ->
{
- if (owned.contains(chatRoom.getShard()))
+ if (this.ownedShards.contains(chatRoom.getShard()))
{
return true;
}
.forEach(chatroom -> chatrooms[chatroom.getShard()].put(chatroom.getId(), chatroom));
}
- public void putChatRoom(ChatRoom chatRoom)
+ void putChatRoom(ChatRoom chatRoom)
{
- chatrooms[chatRoom.getShard()].put(chatRoom.getId(), chatRoom);
+ UUID id = chatRoom.getId();
+ int shard = shardingStrategy.selectShard(id);
+ if (!ownedShards.contains(shard))
+ throw new ShardNotOwnedException(this, chatRoom, shard, ownedShards);
+ chatrooms[shard].put(id, chatRoom);
}
@Override
- public Mono<ChatRoom> getChatRoom(int shard, UUID id)
+ public Mono<ChatRoom> getChatRoom(UUID id)
{
- return Mono.justOrEmpty(chatrooms[shard].get(id));
+ int shard = shardingStrategy.selectShard(id);
+ if (ownedShards.contains(shard))
+ {
+ return Mono.justOrEmpty(chatrooms[shard].get(id));
+ }
+ else
+ {
+ int[] ownedShards = new int[this.ownedShards.size()];
+ Iterator<Integer> iterator = this.ownedShards.iterator();
+ for (int i = 0; iterator.hasNext(); i++)
+ {
+ ownedShards[i] = iterator.next();
+ }
+ return Mono.error(new UnknownChatroomException(
+ id,
+ shard,
+ ownedShards));
+ }
}
@Override
- public Flux<ChatRoom> getChatRooms(int shard)
+ public Flux<ChatRoom> getChatRooms()
{
- return Flux.fromStream(chatrooms[shard].values().stream());
+ return Flux
+ .fromIterable(ownedShards)
+ .flatMap(shard -> Flux.fromIterable(chatrooms[shard].values()));
}
}
import de.juplo.kafka.chat.backend.ChatBackendProperties;
import de.juplo.kafka.chat.backend.ChatBackendProperties.ShardingStrategyType;
-import de.juplo.kafka.chat.backend.domain.ShardedChatHome;
import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
import de.juplo.kafka.chat.backend.domain.ChatHome;
-import de.juplo.kafka.chat.backend.domain.SimpleChatHome;
import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
public class InMemoryServicesConfiguration
{
@Bean
- @ConditionalOnProperty(
- prefix = "chat.backend.inmemory",
- name = "sharding-strategy",
- havingValue = "none",
- matchIfMissing = true)
- ChatHome noneShardingChatHome(InMemoryChatHomeService chatHomeService)
- {
- return new SimpleChatHome(chatHomeService);
- }
-
- @Bean
- @ConditionalOnProperty(
- prefix = "chat.backend.inmemory",
- name = "sharding-strategy",
- havingValue = "kafkalike")
- ChatHome kafkalikeShardingChatHome(
- ChatBackendProperties properties,
- InMemoryChatHomeService chatHomeService,
- StorageStrategy storageStrategy)
+ ChatHome chatHome(InMemoryChatHomeService chatHomeService)
{
- int numShards = properties.getInmemory().getNumShards();
- SimpleChatHome[] chatHomes = new SimpleChatHome[numShards];
- storageStrategy
- .read()
- .subscribe(chatRoom ->
- {
- int shard = chatRoom.getShard();
- if (chatHomes[shard] == null)
- chatHomes[shard] = new SimpleChatHome(chatHomeService, shard);
- });
- ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards);
- return new ShardedChatHome(chatHomes, strategy);
+ return new ChatHome(chatHomeService);
}
@Bean
{
ShardingStrategyType sharding =
properties.getInmemory().getShardingStrategy();
- int numShards = sharding == ShardingStrategyType.none
- ? 1
- : properties.getInmemory().getNumShards();
- int[] ownedShards = sharding == ShardingStrategyType.none
- ? new int[] { 0 }
- : properties.getInmemory().getOwnedShards();
+
+ int numShards;
+ int[] ownedShards;
+ ShardingStrategy shardingStrategy;
+
+ switch (sharding)
+ {
+ case none:
+ numShards = 1;
+ ownedShards = new int[] { 0 };
+ shardingStrategy = id -> 0;
+ break;
+ case kafkalike:
+ numShards = properties.getInmemory().getNumShards();
+ ownedShards = properties.getInmemory().getOwnedShards();
+ shardingStrategy = new KafkaLikeShardingStrategy(numShards);
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown sharding strategy: " + sharding);
+ }
+
return new InMemoryChatHomeService(
numShards,
ownedShards,
+ shardingStrategy,
storageStrategy.read());
}
{
// Given
UUID chatroomId = UUID.randomUUID();
- when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.empty());
+ when(chatHomeService.getChatRoom(any(UUID.class))).thenReturn(Mono.empty());
// When
WebTestClient.ResponseSpec responseSpec = client
{
// Given
UUID chatroomId = UUID.randomUUID();
- when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.empty());
+ when(chatHomeService.getChatRoom(any(UUID.class))).thenReturn(Mono.empty());
// When
WebTestClient.ResponseSpec responseSpec = client
UUID chatroomId = UUID.randomUUID();
String username = "foo";
Long messageId = 66l;
- when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.empty());
+ when(chatHomeService.getChatRoom(any(UUID.class))).thenReturn(Mono.empty());
// When
WebTestClient.ResponseSpec responseSpec = client
UUID chatroomId = UUID.randomUUID();
String username = "foo";
Long messageId = 66l;
- when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.empty());
+ when(chatHomeService.getChatRoom(any(UUID.class))).thenReturn(Mono.empty());
// When
WebTestClient.ResponseSpec responseSpec = client
{
// Given
UUID chatroomId = UUID.randomUUID();
- when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.empty());
+ when(chatHomeService.getChatRoom(any(UUID.class))).thenReturn(Mono.empty());
// When
WebTestClient.ResponseSpec responseSpec = client
0,
Clock.systemDefaultZone(),
chatRoomService, 8);
- when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.just(chatRoom));
+ when(chatHomeService.getChatRoom(any(UUID.class))).thenReturn(Mono.just(chatRoom));
Message existingMessage = new Message(
key,
serialNumberExistingMessage,
0,
Clock.systemDefaultZone(),
chatRoomService, 8);
- when(chatHomeService.getChatRoom(anyInt(), any(UUID.class)))
+ when(chatHomeService.getChatRoom(any(UUID.class)))
.thenReturn(Mono.just(chatRoom));
when(chatRoomService.getMessage(any(Message.MessageKey.class)))
.thenReturn(Mono.empty());
--- /dev/null
+package de.juplo.kafka.chat.backend.domain;
+
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import reactor.core.publisher.Mono;
+
+import java.time.Clock;
+import java.util.UUID;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static pl.rzrz.assertj.reactor.Assertions.assertThat;
+
+
+public class ChatHomeTest
+{
+ @Test
+ @DisplayName("Assert chatroom is delivered, if it exists")
+ void testGetExistingChatroom()
+ {
+ // Given
+ ChatHomeService chatHomeService = mock(ChatHomeService.class);
+ ChatRoom chatRoom = new ChatRoom(
+ UUID.randomUUID(),
+ "Foo",
+ 0,
+ Clock.systemDefaultZone(),
+ mock(ChatRoomService.class),
+ 8);
+ when(chatHomeService.getChatRoom(any(UUID.class))).thenReturn(Mono.just(chatRoom));
+ ChatHome chatHome = new ChatHome(chatHomeService);
+
+ // When
+ Mono<ChatRoom> mono = chatHome.getChatRoom(chatRoom.getId());
+
+ // Then
+ assertThat(mono).emitsExactly(chatRoom);
+ }
+
+ @Test
+ @DisplayName("Assert UnknownChatroomException is thrown, if chatroom does not exist")
+ void testGetNonExistentChatroom()
+ {
+ // Given
+ ChatHomeService chatHomeService = mock(ChatHomeService.class);
+ when(chatHomeService.getChatRoom(any(UUID.class))).thenReturn(Mono.empty());
+ ChatHome chatHome = new ChatHome(chatHomeService);
+
+ // When
+ Mono<ChatRoom> mono = chatHome.getChatRoom(UUID.randomUUID());
+
+ // Then
+ assertThat(mono).sendsError();
+ }
+}
+++ /dev/null
-package de.juplo.kafka.chat.backend.domain;
-
-import org.junit.jupiter.api.DisplayName;
-import org.junit.jupiter.api.Test;
-import reactor.core.publisher.Mono;
-
-import java.time.Clock;
-import java.util.UUID;
-
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyInt;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import static pl.rzrz.assertj.reactor.Assertions.assertThat;
-
-
-public class SimpleChatHomeTest
-{
- @Test
- @DisplayName("Assert chatroom is delivered, if it exists")
- void testGetExistingChatroom()
- {
- // Given
- ChatHomeService chatHomeService = mock(ChatHomeService.class);
- ChatRoom chatRoom = new ChatRoom(
- UUID.randomUUID(),
- "Foo",
- 0,
- Clock.systemDefaultZone(),
- mock(ChatRoomService.class),
- 8);
- when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.just(chatRoom));
- SimpleChatHome chatHome = new SimpleChatHome(chatHomeService);
-
- // When
- Mono<ChatRoom> mono = chatHome.getChatRoom(chatRoom.getId());
-
- // Then
- assertThat(mono).emitsExactly(chatRoom);
- }
-
- @Test
- @DisplayName("Assert UnknownChatroomException is thrown, if chatroom does not exist")
- void testGetNonExistentChatroom()
- {
- // Given
- ChatHomeService chatHomeService = mock(ChatHomeService.class);
- when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.empty());
- SimpleChatHome chatHome = new SimpleChatHome(chatHomeService);
-
- // When
- Mono<ChatRoom> mono = chatHome.getChatRoom(UUID.randomUUID());
-
- // Then
- assertThat(mono).sendsError();
- }
-}
InMemoryChatHomeService chatHomeService = new InMemoryChatHomeService(
1,
new int[] { 0 },
+ id -> 0,
getStorageStrategy().read());
InMemoryChatRoomFactory chatRoomFactory = new InMemoryChatRoomFactory(
protected void start()
{
StorageStrategyITConfig config = getConfig();
- chathome = new SimpleChatHome(config.getChatHomeService());
+ chathome = new ChatHome(config.getChatHomeService());
chatRoomFactory = config.getChatRoomFactory();
}