<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ </dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
@Autowired
ChatBackendProperties properties;
@Autowired
- ChatHome chatHome;
+ ChatHome[] chatHomes;
@Autowired
StorageStrategy storageStrategy;
@PreDestroy
public void onExit()
{
- storageStrategy.write(chatHome.getChatRooms());
+ for (int shard = 0; shard < chatHomes.length; shard++)
+ storageStrategy.write(chatHomes[shard].getChatRooms());
}
public static void main(String[] args)
package de.juplo.kafka.chat.backend;
import de.juplo.kafka.chat.backend.domain.ChatHome;
+import de.juplo.kafka.chat.backend.domain.ChatHomeFactory;
import de.juplo.kafka.chat.backend.domain.ChatHomeService;
+import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
public class ChatBackendConfiguration
{
@Bean
- public ChatHome chatHome(ChatHomeService chatHomeService)
+ ChatHome[] chatHomes(
+ ChatHomeFactory factory,
+ ChatBackendProperties properties,
+ StorageStrategy storageStrategy)
{
- return new ChatHome(chatHomeService, 0);
+ ChatHome[] chatHomes = new ChatHome[properties.getInmemory().getNumShards()];
+ storageStrategy
+ .read()
+ .subscribe(chatRoom ->
+ {
+ int shard = chatRoom.getShard();
+ if (chatHomes[shard] == null)
+ chatHomes[shard] = factory.createChatHome(shard);
+ });
+ return chatHomes;
}
@Bean
- public Clock clock()
+ Clock clock()
{
return Clock.systemDefaultZone();
}
@Setter
public class ChatBackendProperties
{
- private String storageDirectory = Paths.get(System.getProperty("java.io.tmpdir"),"chat", "backend").toString();
private String allowedOrigins = "http://localhost:4200";
private int chatroomBufferSize = 8;
+ private ServiceType services = ServiceType.inmemory;
+ private InMemoryServicesProperties inmemory = new InMemoryServicesProperties();
+
+
+ @Getter
+ @Setter
+ public static class InMemoryServicesProperties
+ {
+ private ShardingStrategyType shardingStrategy = ShardingStrategyType.kafkalike;
+ private int numShards = 10;
+ private int[] ownedShards = { 2 };
+ private StorageStrategyType storageStrategy = StorageStrategyType.files;
+ private String storageDirectory = Paths.get(System.getProperty("java.io.tmpdir"),"chat", "backend").toString();
+ }
+
+ public enum ServiceType { inmemory }
+ public enum StorageStrategyType { files, mongodb }
+ public enum ShardingStrategyType { none, kafkalike }
}
{
private UUID id;
private String name;
- private int shard;
public static ChatRoomTo from(ChatRoom chatroom)
ChatRoomTo to = new ChatRoomTo();
to.id = chatroom.getId();
to.name = chatroom.getName();
- to.shard = chatroom.getShard();
return to;
}
}
--- /dev/null
+package de.juplo.kafka.chat.backend.api;
+
+import lombok.RequiredArgsConstructor;
+import org.apache.kafka.common.utils.Utils;
+
+import java.util.UUID;
+
+
+@RequiredArgsConstructor
+public class KafkaLikeShardingStrategy implements ShardingStrategy
+{
+ private final int numPartitions;
+
+ @Override
+ public int selectShard(UUID chatRoomId)
+ {
+ byte[] serializedKey = chatRoomId.toString().getBytes();
+ return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.domain;
+
+public interface ChatHomeFactory
+{
+ ChatHome createChatHome(int shard);
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.inmemory;
+
+import de.juplo.kafka.chat.backend.domain.ChatHome;
+import de.juplo.kafka.chat.backend.domain.ChatHomeFactory;
+import lombok.RequiredArgsConstructor;
+
+
+@RequiredArgsConstructor
+public class InMemoryChatHomeFactory implements ChatHomeFactory
+{
+ private final InMemoryChatHomeService service;
+
+
+ @Override
+ public ChatHome createChatHome(int shard)
+ {
+ return new ChatHome(service, shard);
+ }
+}
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
+import java.util.*;
@Slf4j
private final Map<UUID, ChatRoom>[] chatrooms;
- public InMemoryChatHomeService(int numShards, Flux<ChatRoom> chatroomFlux)
+ public InMemoryChatHomeService(
+ int numShards,
+ int[] ownedShards,
+ Flux<ChatRoom> chatroomFlux)
{
log.debug("Creating InMemoryChatHomeService");
this.chatrooms = new Map[numShards];
+ Set<Integer> owned = Arrays
+ .stream(ownedShards)
+ .collect(
+ () -> new HashSet<>(),
+ (set, i) -> set.add(i),
+ (a, b) -> a.addAll(b));
for (int shard = 0; shard < numShards; shard++)
- chatrooms[shard] = new HashMap<>();
+ {
+ chatrooms[shard] = owned.contains(shard)
+ ? new HashMap<>()
+ : null;
+ }
chatroomFlux
+ .filter(chatRoom ->
+ {
+ if (owned.contains(chatRoom.getShard()))
+ {
+ return true;
+ }
+ else
+ {
+ log.info("Ignoring not owned chat-room {}", chatRoom);
+ return false;
+ }
+ })
.toStream()
.forEach(chatroom -> chatrooms[chatroom.getShard()].put(chatroom.getId(), chatroom));
}
package de.juplo.kafka.chat.backend.persistence.inmemory;
import de.juplo.kafka.chat.backend.ChatBackendProperties;
+import de.juplo.kafka.chat.backend.api.KafkaLikeShardingStrategy;
import de.juplo.kafka.chat.backend.api.ShardingStrategy;
import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
@ConditionalOnProperty(
prefix = "chat.backend",
name = "services",
- havingValue = "in-memory",
+ havingValue = "inmemory",
matchIfMissing = true)
@Configuration
public class InMemoryServicesConfiguration
{
@Bean
- InMemoryChatHomeService chatHomeService(StorageStrategy storageStrategy)
+ InMemoryChatHomeService chatHomeService(
+ ChatBackendProperties properties,
+ StorageStrategy storageStrategy)
{
- return new InMemoryChatHomeService(1, storageStrategy.read());
+ return new InMemoryChatHomeService(
+ properties.getInmemory().getNumShards(),
+ properties.getInmemory().getOwnedShards(),
+ storageStrategy.read());
+ }
+
+ @Bean
+ InMemoryChatHomeFactory chatHomeFactory(InMemoryChatHomeService service)
+ {
+ return new InMemoryChatHomeFactory(service);
}
@Bean
properties.getChatroomBufferSize());
}
+ @ConditionalOnProperty(
+ prefix = "chat.backend.inmemory",
+ name = "sharding-strategy",
+ havingValue = "none",
+ matchIfMissing = true)
@Bean
- ShardingStrategy shardingStrategy()
+ ShardingStrategy defaultShardingStrategy()
{
return chatRoomId -> 0;
}
+
+ @ConditionalOnProperty(
+ prefix = "chat.backend.inmemory",
+ name = "sharding-strategy",
+ havingValue = "kafkalike")
+ @Bean
+ ShardingStrategy kafkalikeShardingStrategy(ChatBackendProperties properties)
+ {
+ return new KafkaLikeShardingStrategy(
+ properties.getInmemory().getNumShards());
+ }
}
import com.fasterxml.jackson.databind.ObjectMapper;
import de.juplo.kafka.chat.backend.ChatBackendProperties;
+import de.juplo.kafka.chat.backend.api.ShardingStrategy;
import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomService;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
@ConditionalOnProperty(
- prefix = "chat.backend",
- name = "storage",
+ prefix = "chat.backend.inmemory",
+ name = "storage-strategy",
havingValue = "files",
matchIfMissing = true)
@Configuration
public StorageStrategy storageStrategy(
ChatBackendProperties properties,
Clock clock,
+ ShardingStrategy shardingStrategy,
ObjectMapper mapper)
{
return new FilesStorageStrategy(
- Paths.get(properties.getStorageDirectory()),
+ Paths.get(properties.getInmemory().getStorageDirectory()),
clock,
properties.getChatroomBufferSize(),
+ shardingStrategy,
messageFlux -> new InMemoryChatRoomService(messageFlux),
mapper);
}
import com.fasterxml.jackson.databind.ObjectMapper;
import de.juplo.kafka.chat.backend.api.ChatRoomTo;
import de.juplo.kafka.chat.backend.api.MessageTo;
+import de.juplo.kafka.chat.backend.api.ShardingStrategy;
import de.juplo.kafka.chat.backend.domain.ChatRoom;
import de.juplo.kafka.chat.backend.domain.Message;
import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Clock;
+import java.util.UUID;
import static java.nio.file.StandardOpenOption.CREATE;
import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
private final Path storagePath;
private final Clock clock;
private final int bufferSize;
+ private final ShardingStrategy shardingStrategy;
private final ChatRoomServiceFactory factory;
private final ObjectMapper mapper;
return Flux
.from(new JsonFilePublisher<ChatRoomTo>(chatroomsPath(), mapper, type))
.log()
- .map(chatRoomTo -> new ChatRoom(
- chatRoomTo.getId(),
- chatRoomTo.getName(),
- chatRoomTo.getShard(),
- clock,
- factory.create(readMessages(chatRoomTo)),
- bufferSize));
+ .map(chatRoomTo ->
+ {
+ UUID chatRoomId = chatRoomTo.getId();
+ int shard = shardingStrategy.selectShard(chatRoomId);
+ return new ChatRoom(
+ chatRoomTo.getId(),
+ chatRoomTo.getName(),
+ shard,
+ clock,
+ factory.create(readMessages(chatRoomTo)),
+ bufferSize);
+ });
}
public void writeMessages(ChatRoomTo chatroomTo, Flux<Message> messageFlux)
@Id
private String id;
private String name;
- private int shard;
private List<MessageTo> messages;
public static ChatRoomTo from(ChatRoom chatroom)
return new ChatRoomTo(
chatroom.getId().toString(),
chatroom.getName(),
- chatroom.getShard(),
chatroom
.getMessages()
.map(MessageTo::from)
package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
import de.juplo.kafka.chat.backend.ChatBackendProperties;
+import de.juplo.kafka.chat.backend.api.ShardingStrategy;
import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomService;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
@ConditionalOnProperty(
- prefix = "chat.backend",
- name = "storage",
+ prefix = "chat.backend.inmemory",
+ name = "storage-strategy",
havingValue = "mongodb")
@Configuration
public class MongoDbStorageConfiguration
public StorageStrategy storageStrategy(
ChatRoomRepository chatRoomRepository,
ChatBackendProperties properties,
- Clock clock)
+ Clock clock,
+ ShardingStrategy shardingStrategy)
{
return new MongoDbStorageStrategy(
chatRoomRepository,
clock,
properties.getChatroomBufferSize(),
+ shardingStrategy,
messageFlux -> new InMemoryChatRoomService(messageFlux));
}
}
package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
+import de.juplo.kafka.chat.backend.api.ShardingStrategy;
import de.juplo.kafka.chat.backend.domain.ChatRoom;
import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
import de.juplo.kafka.chat.backend.persistence.storage.files.ChatRoomServiceFactory;
private final ChatRoomRepository repository;
private final Clock clock;
private final int bufferSize;
+ private final ShardingStrategy shardingStrategy;
private final ChatRoomServiceFactory factory;
{
return Flux
.fromIterable(repository.findAll())
- .map(chatRoomTo -> new ChatRoom(
- UUID.fromString(chatRoomTo.getId()),
- chatRoomTo.getName(),
- chatRoomTo.getShard(),
- clock,
- factory.create(
- Flux
- .fromIterable(chatRoomTo.getMessages())
- .map(messageTo -> messageTo.toMessage())),
- bufferSize));
+ .map(chatRoomTo ->
+ {
+ UUID chatRoomId = UUID.fromString(chatRoomTo.getId());
+ int shard = shardingStrategy.selectShard(chatRoomId);
+ return new ChatRoom(
+ chatRoomId,
+ chatRoomTo.getName(),
+ shard,
+ clock,
+ factory.create(
+ Flux
+ .fromIterable(chatRoomTo.getMessages())
+ .map(messageTo -> messageTo.toMessage())),
+ bufferSize);
+ });
}
}
--- /dev/null
+package de.juplo.kafka.chat.backend;
+
+import org.springframework.boot.test.context.SpringBootTest;
+
+
+@SpringBootTest(
+ webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
+ properties = {
+ "chat.backend.inmemory.storage-directory=target/test-classes/data/files",
+ "chat.backend.inmemory.sharding-strategy=kafkalike" })
+class InMemoryWithFilesAndShardingConfigurationIT extends AbstractConfigurationIT
+{
+}
@SpringBootTest(
webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
- properties = "chat.backend.storage-directory=target/test-classes/data/files")
+ properties = {
+ "chat.backend.inmemory.sharding-strategy=none",
+ "chat.backend.inmemory.num-shards=1",
+ "chat.backend.inmemory.owned-shards=0",
+ "chat.backend.inmemory.storage-directory=target/test-classes/data/files" })
class InMemoryWithFilesConfigurationIT extends AbstractConfigurationIT
{
}
@SpringBootTest(
webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
properties = {
- "chat.backend.storage=mongodb",
"spring.data.mongodb.host=localhost",
- "spring.data.mongodb.database=test" })
+ "spring.data.mongodb.database=test",
+ "chat.backend.inmemory.sharding-strategy=kafkalike",
+ "chat.backend.inmemory.storage-strategy=mongodb" })
@Testcontainers
@Slf4j
class InMemoryWithMongoDbConfigurationIT extends AbstractConfigurationIT
package de.juplo.kafka.chat.backend.api;
+import de.juplo.kafka.chat.backend.ChatBackendProperties;
import de.juplo.kafka.chat.backend.domain.*;
+import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatHomeService;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient;
import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.boot.test.mock.mockito.MockBean;
+import org.springframework.context.annotation.Bean;
import org.springframework.http.MediaType;
import org.springframework.test.web.reactive.server.WebTestClient;
import reactor.core.publisher.Mono;
import java.time.Clock;
import java.time.LocalDateTime;
+import java.util.Arrays;
import java.util.UUID;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;
-@SpringBootTest(properties = "spring.main.allow-bean-definition-overriding=true")
+@SpringBootTest(properties = {
+ "spring.main.allow-bean-definition-overriding=true",
+ "chat.backend.inmemory.owned-shards=0,1,2,3,4,5,6,7,8,9" })
@AutoConfigureWebTestClient
@Slf4j
public class ChatBackendControllerTest
{
@MockBean
- ChatHomeService chatHomeService;
+ InMemoryChatHomeService chatHomeService;
@MockBean
ChatRoomService chatRoomService;
.jsonPath("$.username").isEqualTo(user);
verify(chatRoomService, never()).persistMessage(eq(key), any(LocalDateTime.class), any(String.class));
}
+
+ @TestConfiguration
+ static class Config
+ {
+ @Bean
+ ChatHome[] chatHomes(
+ ChatBackendProperties properties,
+ ChatHomeFactory factory)
+ {
+ ChatHome[] chatHomes = new ChatHome[properties.getInmemory().getNumShards()];
+ Arrays
+ .stream(properties.getInmemory().getOwnedShards())
+ .forEach(i -> chatHomes[i] = factory.createChatHome(i));
+ return chatHomes;
+ }
+ }
}
path,
clock,
8,
+ chatRoomId -> 0,
messageFlux -> new InMemoryChatRoomService(messageFlux),
mapper);
}
@Override
protected Supplier<ChatHomeService> getChatHomeServiceSupplier()
{
- return () -> new InMemoryChatHomeService(1, getStorageStrategy().read());
+ return () -> new InMemoryChatHomeService(
+ 1,
+ new int[] { 0 },
+ getStorageStrategy().read());
}
@Override
@Override
protected Supplier<ChatHomeService> getChatHomeServiceSupplier()
{
- return () -> new InMemoryChatHomeService(1, getStorageStrategy().read());
+ return () -> new InMemoryChatHomeService(
+ 1,
+ new int[] { 0 },
+ getStorageStrategy().read());
}
@Override
chatRoomRepository,
clock,
8,
+ chatRoomId -> 0,
messageFlux -> new InMemoryChatRoomService(messageFlux));
}
[ {
"id" : "5c73531c-6fc4-426c-adcb-afc5c140a0f7",
- "name" : "FOO",
- "shard" : 0
-} ]
\ No newline at end of file
+ "name" : "FOO"
+} ]
{
"_id": "5c73531c-6fc4-426c-adcb-afc5c140a0f7",
"name": "FOO",
- "shard": 0,
"messages": [
{
"_id": "peter--1",