private final UUID id;
@Getter
private final String name;
- @Getter
- private final int shard;
private final Clock clock;
private final ChatRoomService service;
private final int bufferSize;
public ChatRoom(
UUID id,
String name,
- int shard,
Clock clock,
ChatRoomService service,
int bufferSize)
{
this.id = id;
this.name = name;
- this.shard = shard;
this.clock = clock;
this.service = service;
this.bufferSize = bufferSize;
import de.juplo.kafka.chat.backend.domain.ChatRoom;
import de.juplo.kafka.chat.backend.domain.ChatHomeService;
+import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Slf4j
public class InMemoryChatHomeService implements ChatHomeService<InMemoryChatRoomService>
{
+ private final ShardingStrategy shardingStrategy;
private final Map<UUID, ChatRoom>[] chatrooms;
public InMemoryChatHomeService(
+ ShardingStrategy shardingStrategy,
int numShards,
int[] ownedShards,
Flux<ChatRoom> chatroomFlux)
{
log.debug("Creating ChatHomeService");
+ this.shardingStrategy = shardingStrategy;
this.chatrooms = new Map[numShards];
Set<Integer> owned = Arrays
.stream(ownedShards)
chatroomFlux
.filter(chatRoom ->
{
- if (owned.contains(chatRoom.getShard()))
+ int shard = shardingStrategy.selectShard(chatRoom.getId());
+ if (owned.contains(shard))
{
return true;
}
}
})
.toStream()
- .forEach(chatroom -> chatrooms[chatroom.getShard()].put(chatroom.getId(), chatroom));
+ .forEach(chatRoom ->
+ {
+ getChatRoomMapFor(chatRoom).put(chatRoom.getId(), chatRoom);
+ });
}
@Override
public Mono<ChatRoom> putChatRoom(ChatRoom chatRoom)
{
- chatrooms[chatRoom.getShard()].put(chatRoom.getId(), chatRoom);
+ getChatRoomMapFor(chatRoom).put(chatRoom.getId(), chatRoom);
return Mono.just(chatRoom);
}
{
return Flux.fromStream(chatrooms[shard].values().stream());
}
+
+
+ private Map<UUID, ChatRoom> getChatRoomMapFor(ChatRoom chatRoom)
+ {
+ int shard = shardingStrategy.selectShard(chatRoom.getId());
+ return chatrooms[shard];
+ }
}
public Mono<ChatRoom> createChatRoom(UUID id, String name)
{
log.info("Creating ChatRoom with buffer-size {}", bufferSize);
- int shard = shardingStrategy.selectShard(id);
ChatRoomService service = new InMemoryChatRoomService(Flux.empty());
- return Mono.just(new ChatRoom(id, name, shard, clock, service, bufferSize));
+ return Mono.just(new ChatRoom(id, name, clock, service, bufferSize));
}
}
StorageStrategy storageStrategy)
{
int numShards = properties.getInmemory().getNumShards();
+ ShardingStrategy shardingStrategy = new KafkaLikeShardingStrategy(numShards);
SimpleChatHome[] chatHomes = new SimpleChatHome[numShards];
storageStrategy
.read()
.subscribe(chatRoom ->
{
- int shard = chatRoom.getShard();
+ int shard = shardingStrategy.selectShard(chatRoom.getId());
if (chatHomes[shard] == null)
chatHomes[shard] = new SimpleChatHome(chatHomeService, shard);
});
- ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards);
- return new ShardedChatHome(chatHomes, strategy);
+ return new ShardedChatHome(chatHomes, shardingStrategy);
}
@Bean
InMemoryChatHomeService chatHomeService(
ChatBackendProperties properties,
+ ShardingStrategy shardingStrategy,
StorageStrategy storageStrategy)
{
ShardingStrategyType sharding =
? new int[] { 0 }
: properties.getInmemory().getOwnedShards();
return new InMemoryChatHomeService(
+ shardingStrategy,
numShards,
ownedShards,
storageStrategy.read());
import com.fasterxml.jackson.databind.ObjectMapper;
import de.juplo.kafka.chat.backend.ChatBackendProperties;
-import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomService;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
public StorageStrategy storageStrategy(
ChatBackendProperties properties,
Clock clock,
- ShardingStrategy shardingStrategy,
ObjectMapper mapper)
{
return new FilesStorageStrategy(
Paths.get(properties.getInmemory().getStorageDirectory()),
clock,
properties.getChatroomBufferSize(),
- shardingStrategy,
+
messageFlux -> new InMemoryChatRoomService(messageFlux),
mapper);
}
import com.fasterxml.jackson.databind.ObjectMapper;
import de.juplo.kafka.chat.backend.api.ChatRoomTo;
import de.juplo.kafka.chat.backend.api.MessageTo;
-import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
import de.juplo.kafka.chat.backend.domain.ChatRoom;
import de.juplo.kafka.chat.backend.domain.Message;
import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Clock;
-import java.util.UUID;
import static java.nio.file.StandardOpenOption.CREATE;
import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
private final Path storagePath;
private final Clock clock;
private final int bufferSize;
- private final ShardingStrategy shardingStrategy;
private final ChatRoomServiceFactory factory;
private final ObjectMapper mapper;
return Flux
.from(new JsonFilePublisher<ChatRoomTo>(chatroomsPath(), mapper, type))
.log()
- .map(chatRoomTo ->
- {
- UUID chatRoomId = chatRoomTo.getId();
- int shard = shardingStrategy.selectShard(chatRoomId);
- return new ChatRoom(
+ .map(chatRoomTo -> new ChatRoom(
chatRoomTo.getId(),
chatRoomTo.getName(),
- shard,
clock,
factory.create(readMessages(chatRoomTo)),
- bufferSize);
- });
+ bufferSize));
}
public void writeMessages(ChatRoomTo chatroomTo, Flux<Message> messageFlux)
package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
import de.juplo.kafka.chat.backend.ChatBackendProperties;
-import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomService;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
public StorageStrategy storageStrategy(
ChatRoomRepository chatRoomRepository,
ChatBackendProperties properties,
- Clock clock,
- ShardingStrategy shardingStrategy)
+ Clock clock)
{
return new MongoDbStorageStrategy(
chatRoomRepository,
clock,
properties.getChatroomBufferSize(),
- shardingStrategy,
messageFlux -> new InMemoryChatRoomService(messageFlux));
}
}
package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
-import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
import de.juplo.kafka.chat.backend.domain.ChatRoom;
import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
import de.juplo.kafka.chat.backend.persistence.storage.files.ChatRoomServiceFactory;
private final ChatRoomRepository repository;
private final Clock clock;
private final int bufferSize;
- private final ShardingStrategy shardingStrategy;
private final ChatRoomServiceFactory factory;
.map(chatRoomTo ->
{
UUID chatRoomId = UUID.fromString(chatRoomTo.getId());
- int shard = shardingStrategy.selectShard(chatRoomId);
return new ChatRoom(
chatRoomId,
chatRoomTo.getName(),
- shard,
clock,
factory.create(
Flux
ChatRoom chatRoom = new ChatRoom(
chatroomId,
"Test-ChatRoom",
- 0,
Clock.systemDefaultZone(),
chatRoomService, 8);
when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.just(chatRoom));
ChatRoom chatRoom = new ChatRoom(
chatroomId,
"Test-ChatRoom",
- 0,
Clock.systemDefaultZone(),
chatRoomService, 8);
when(chatHomeService.getChatRoom(anyInt(), any(UUID.class)))
ChatRoom chatRoom = new ChatRoom(
UUID.randomUUID(),
"Foo",
- 0,
Clock.systemDefaultZone(),
chatRoomService,
8);
ChatRoom chatRoom = new ChatRoom(
UUID.randomUUID(),
"Foo",
- 0,
Clock.systemDefaultZone(),
chatRoomService,
8);
ChatRoom chatRoom = new ChatRoom(
UUID.randomUUID(),
"Foo",
- 0,
Clock.systemDefaultZone(),
chatRoomService,
8);
ChatRoom chatRoom = new ChatRoom(
UUID.randomUUID(),
"Foo",
- 0,
Clock.systemDefaultZone(),
chatRoomService,
8);
ChatRoom chatRoom = new ChatRoom(
UUID.randomUUID(),
"Foo",
- 0,
Clock.systemDefaultZone(),
chatRoomService,
8);
ChatRoom chatRoom = new ChatRoom(
UUID.randomUUID(),
"Foo",
- 0,
Clock.systemDefaultZone(),
mock(ChatRoomService.class),
8);
path,
clock,
8,
- chatRoomId -> 0,
messageFlux -> new InMemoryChatRoomService(messageFlux),
mapper);
}
protected Supplier<ChatHomeService> getChatHomeServiceSupplier()
{
return () -> new InMemoryChatHomeService(
+ chatRoomId -> 0,
1,
new int[] { 0 },
getStorageStrategy().read());
protected Supplier<ChatHomeService> getChatHomeServiceSupplier()
{
return () -> new InMemoryChatHomeService(
+ chatRoomId -> 0,
1,
new int[] { 0 },
getStorageStrategy().read());
chatRoomRepository,
clock,
8,
- chatRoomId -> 0,
messageFlux -> new InMemoryChatRoomService(messageFlux));
}