@RequiredArgsConstructor
public class ChatBackendController
{
- private final ChatHome[] chatHomes;
- private final ShardingStrategy selectionStrategy;
+ private final ChatHome chatHome;
private final ChatRoomFactory factory;
private final StorageStrategy storageStrategy;
UUID chatRoomId = UUID.randomUUID();
return factory
.createChatRoom(chatRoomId, name)
- .flatMap(chatRoom -> chatHomes[chatRoom.getShard()].putChatRoom(chatRoom))
+ .flatMap(chatRoom -> chatHome.putChatRoom(chatRoom))
.map(ChatRoomTo::from);
}
@GetMapping("list")
public Flux<ChatRoomTo> list()
{
- return Flux
- .fromArray(chatHomes)
- .flatMap(chatHome -> chatHome.getChatRooms())
+ return chatHome
+ .getChatRooms()
.map(chatroom -> ChatRoomTo.from(chatroom));
}
@GetMapping("{chatroomId}/list")
public Flux<MessageTo> list(@PathVariable UUID chatroomId)
{
- return chatHomes[selectShard(chatroomId)]
+ return chatHome
.getChatRoom(chatroomId)
.flatMapMany(chatroom -> chatroom
.getMessages()
@GetMapping("{chatroomId}")
public Mono<ChatRoomTo> get(@PathVariable UUID chatroomId)
{
- return chatHomes[selectShard(chatroomId)]
+ return chatHome
.getChatRoom(chatroomId)
.map(chatroom -> ChatRoomTo.from(chatroom));
}
@RequestBody String text)
{
return
- chatHomes[selectShard(chatroomId)]
+ chatHome
.getChatRoom(chatroomId)
.flatMap(chatroom -> put(chatroom, username, messageId, text));
}
@PathVariable Long messageId)
{
return
- chatHomes[selectShard(chatroomId)]
+ chatHome
.getChatRoom(chatroomId)
.flatMap(chatroom -> get(chatroom, username, messageId));
}
@GetMapping(path = "{chatroomId}/listen")
public Flux<ServerSentEvent<MessageTo>> listen(@PathVariable UUID chatroomId)
{
- return chatHomes[selectShard(chatroomId)]
+ return chatHome
.getChatRoom(chatroomId)
.flatMapMany(chatroom -> listen(chatroom));
}
@PostMapping("/store")
public void store()
{
- for (int shard = 0; shard < chatHomes.length; shard++)
- storageStrategy.write(chatHomes[shard].getChatRooms());
- }
-
- private int selectShard(UUID chatroomId)
- {
- return selectionStrategy.selectShard(chatroomId);
+ storageStrategy.write(chatHome.getChatRooms());
}
}
+++ /dev/null
-package de.juplo.kafka.chat.backend.api;
-
-import lombok.RequiredArgsConstructor;
-import org.apache.kafka.common.utils.Utils;
-
-import java.util.UUID;
-
-
-@RequiredArgsConstructor
-public class KafkaLikeShardingStrategy implements ShardingStrategy
-{
- private final int numPartitions;
-
- @Override
- public int selectShard(UUID chatRoomId)
- {
- byte[] serializedKey = chatRoomId.toString().getBytes();
- return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;
- }
-}
+++ /dev/null
-package de.juplo.kafka.chat.backend.api;
-
-import java.util.UUID;
-
-
-public interface ShardingStrategy
-{
- int selectShard(UUID chatRoomId);
-}
--- /dev/null
+package de.juplo.kafka.chat.backend.domain;
+
+import lombok.RequiredArgsConstructor;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.util.UUID;
+
+
+@RequiredArgsConstructor
+public class ShardedChatHome implements ChatHome
+{
+ private final ChatHome[] chatHomes;
+ private final ShardingStrategy selectionStrategy;
+
+
+ @Override
+ public Mono<ChatRoom> putChatRoom(ChatRoom chatRoom)
+ {
+ return chatHomes[selectShard(chatRoom.getId())].putChatRoom(chatRoom);
+ }
+
+ @Override
+ public Mono<ChatRoom> getChatRoom(UUID id)
+ {
+ return chatHomes[selectShard(id)].getChatRoom(id);
+ }
+
+ @Override
+ public Flux<ChatRoom> getChatRooms()
+ {
+ return Flux
+ .fromArray(chatHomes)
+ .flatMap(chatHome -> chatHome.getChatRooms());
+ }
+
+
+ private int selectShard(UUID chatroomId)
+ {
+ return selectionStrategy.selectShard(chatroomId);
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.domain;
+
+import java.util.UUID;
+
+
+public interface ShardingStrategy
+{
+ int selectShard(UUID chatRoomId);
+}
private final ChatHomeService service;
private final int shard;
+
+ public SimpleChatHome(ChatHomeService service)
+ {
+ this(service, 0);
+ }
+
+
@Override
public Mono<ChatRoom> putChatRoom(ChatRoom chatRoom)
{
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence;
+
+import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
+import lombok.RequiredArgsConstructor;
+import org.apache.kafka.common.utils.Utils;
+
+import java.util.UUID;
+
+
+@RequiredArgsConstructor
+public class KafkaLikeShardingStrategy implements ShardingStrategy
+{
+ private final int numPartitions;
+
+ @Override
+ public int selectShard(UUID chatRoomId)
+ {
+ byte[] serializedKey = chatRoomId.toString().getBytes();
+ return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;
+ }
+}
package de.juplo.kafka.chat.backend.persistence.inmemory;
-import de.juplo.kafka.chat.backend.api.ShardingStrategy;
+import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
import de.juplo.kafka.chat.backend.domain.ChatRoom;
import de.juplo.kafka.chat.backend.domain.ChatRoomFactory;
import de.juplo.kafka.chat.backend.domain.ChatRoomService;
package de.juplo.kafka.chat.backend.persistence.inmemory;
import de.juplo.kafka.chat.backend.ChatBackendProperties;
-import de.juplo.kafka.chat.backend.api.KafkaLikeShardingStrategy;
-import de.juplo.kafka.chat.backend.api.ShardingStrategy;
+import de.juplo.kafka.chat.backend.ChatBackendProperties.ShardingStrategyType;
+import de.juplo.kafka.chat.backend.domain.ShardedChatHome;
+import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
+import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
import de.juplo.kafka.chat.backend.domain.ChatHome;
import de.juplo.kafka.chat.backend.domain.SimpleChatHome;
import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
public class InMemoryServicesConfiguration
{
@Bean
- ChatHome[] chatHomes(
+ @ConditionalOnProperty(
+ prefix = "chat.backend.inmemory",
+ name = "sharding-strategy",
+ havingValue = "none",
+ matchIfMissing = true)
+ ChatHome noneShardingChatHome(InMemoryChatHomeService chatHomeService)
+ {
+ return new SimpleChatHome(chatHomeService);
+ }
+
+ @Bean
+ @ConditionalOnProperty(
+ prefix = "chat.backend.inmemory",
+ name = "sharding-strategy",
+ havingValue = "kafkalike")
+ ChatHome kafkalikeShardingChatHome(
ChatBackendProperties properties,
InMemoryChatHomeService chatHomeService,
StorageStrategy storageStrategy)
{
- SimpleChatHome[] chatHomes = new SimpleChatHome[properties.getInmemory().getNumShards()];
+ int numShards = properties.getInmemory().getNumShards();
+ SimpleChatHome[] chatHomes = new SimpleChatHome[numShards];
storageStrategy
.read()
.subscribe(chatRoom ->
if (chatHomes[shard] == null)
chatHomes[shard] = new SimpleChatHome(chatHomeService, shard);
});
- return chatHomes;
+ ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards);
+ return new ShardedChatHome(chatHomes, strategy);
}
@Bean
ChatBackendProperties properties,
StorageStrategy storageStrategy)
{
+ ShardingStrategyType sharding =
+ properties.getInmemory().getShardingStrategy();
+ int numShards = sharding == ShardingStrategyType.none
+ ? 1
+ : properties.getInmemory().getNumShards();
+ int[] ownedShards = sharding == ShardingStrategyType.none
+ ? new int[] { 0 }
+ : properties.getInmemory().getOwnedShards();
return new InMemoryChatHomeService(
- properties.getInmemory().getNumShards(),
- properties.getInmemory().getOwnedShards(),
+ numShards,
+ ownedShards,
storageStrategy.read());
}
import com.fasterxml.jackson.databind.ObjectMapper;
import de.juplo.kafka.chat.backend.ChatBackendProperties;
-import de.juplo.kafka.chat.backend.api.ShardingStrategy;
+import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomService;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import com.fasterxml.jackson.databind.ObjectMapper;
import de.juplo.kafka.chat.backend.api.ChatRoomTo;
import de.juplo.kafka.chat.backend.api.MessageTo;
-import de.juplo.kafka.chat.backend.api.ShardingStrategy;
+import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
import de.juplo.kafka.chat.backend.domain.ChatRoom;
import de.juplo.kafka.chat.backend.domain.Message;
import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
import de.juplo.kafka.chat.backend.ChatBackendProperties;
-import de.juplo.kafka.chat.backend.api.ShardingStrategy;
+import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomService;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
-import de.juplo.kafka.chat.backend.api.ShardingStrategy;
+import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
import de.juplo.kafka.chat.backend.domain.ChatRoom;
import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
import de.juplo.kafka.chat.backend.persistence.storage.files.ChatRoomServiceFactory;
webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
properties = {
"chat.backend.inmemory.sharding-strategy=none",
- "chat.backend.inmemory.num-shards=1",
- "chat.backend.inmemory.owned-shards=0",
"chat.backend.inmemory.storage-directory=target/test-classes/data/files" })
class InMemoryWithFilesConfigurationIT extends AbstractConfigurationIT
{
package de.juplo.kafka.chat.backend.api;
-import de.juplo.kafka.chat.backend.ChatBackendProperties;
import de.juplo.kafka.chat.backend.domain.*;
import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatHomeService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient;
import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.boot.test.mock.mockito.MockBean;
-import org.springframework.context.annotation.Bean;
import org.springframework.http.MediaType;
import org.springframework.test.web.reactive.server.WebTestClient;
import reactor.core.publisher.Mono;
import java.time.Clock;
import java.time.LocalDateTime;
-import java.util.Arrays;
import java.util.UUID;
import static org.mockito.ArgumentMatchers.any;
@SpringBootTest(properties = {
"spring.main.allow-bean-definition-overriding=true",
- "chat.backend.inmemory.owned-shards=0,1,2,3,4,5,6,7,8,9" })
+ "chat.backend.inmemory.sharding-strategy=none" })
@AutoConfigureWebTestClient
@Slf4j
public class ChatBackendControllerTest
.jsonPath("$.username").isEqualTo(user);
verify(chatRoomService, never()).persistMessage(eq(key), any(LocalDateTime.class), any(String.class));
}
-
- @TestConfiguration
- static class Config
- {
- @Bean
- ChatHome[] chatHomes(
- ChatBackendProperties properties,
- InMemoryChatHomeService service)
- {
- SimpleChatHome[] chatHomes = new SimpleChatHome[properties.getInmemory().getNumShards()];
- Arrays
- .stream(properties.getInmemory().getOwnedShards())
- .forEach(i -> chatHomes[i] = new SimpleChatHome(service, i));
- return chatHomes;
- }
- }
}
mock(ChatRoomService.class),
8);
when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.just(chatRoom));
- SimpleChatHome chatHome = new SimpleChatHome(chatHomeService, 0);
+ SimpleChatHome chatHome = new SimpleChatHome(chatHomeService);
// When
Mono<ChatRoom> mono = chatHome.getChatRoom(chatRoom.getId());
// Given
ChatHomeService chatHomeService = mock(ChatHomeService.class);
when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.empty());
- SimpleChatHome chatHome = new SimpleChatHome(chatHomeService, 0);
+ SimpleChatHome chatHome = new SimpleChatHome(chatHomeService);
// When
Mono<ChatRoom> mono = chatHome.getChatRoom(UUID.randomUUID());
protected void start()
{
- chathome = new SimpleChatHome(getChatHomeServiceSupplier().get(), 0);
+ chathome = new SimpleChatHome(getChatHomeServiceSupplier().get());
chatRoomFactory = getChatRoomFactory();
}
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
-import de.juplo.kafka.chat.backend.api.ShardingStrategy;
+import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
import de.juplo.kafka.chat.backend.domain.ChatHomeService;
import de.juplo.kafka.chat.backend.domain.ChatRoomFactory;
import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomFactory;
package de.juplo.kafka.chat.backend.persistence;
-import de.juplo.kafka.chat.backend.api.ShardingStrategy;
+import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
import de.juplo.kafka.chat.backend.domain.ChatHomeService;
import de.juplo.kafka.chat.backend.domain.ChatRoomFactory;
import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatHomeService;