- Technically, the attribute is not needed.
- But, to implement the gua/sha-pattern, it is needed as routing-hing.
- Hence, it is reintroduced here and also added to the `ChatRoomTo` to
be send to the client.
{
private UUID id;
private String name;
+ private int shard;
public static ChatRoomTo from(ChatRoom chatroom)
ChatRoomTo to = new ChatRoomTo();
to.id = chatroom.getId();
to.name = chatroom.getName();
+ to.shard = chatroom.getShard();
return to;
}
}
private final UUID id;
@Getter
private final String name;
+ @Getter
+ private final int shard;
private final Clock clock;
private final ChatRoomService service;
private final int bufferSize;
public ChatRoom(
UUID id,
String name,
+ int shard,
Clock clock,
ChatRoomService service,
int bufferSize)
log.info("Created ChatRoom {} with buffer-size {}", id, bufferSize);
this.id = id;
this.name = name;
+ this.shard = shard;
this.clock = clock;
this.service = service;
this.bufferSize = bufferSize;
import de.juplo.kafka.chat.backend.domain.ChatRoom;
import de.juplo.kafka.chat.backend.domain.ChatHomeService;
-import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Slf4j
public class InMemoryChatHomeService implements ChatHomeService
{
- private final ShardingStrategy shardingStrategy;
private final Map<UUID, ChatRoom>[] chatrooms;
public InMemoryChatHomeService(
- ShardingStrategy shardingStrategy,
int numShards,
int[] ownedShards,
Flux<ChatRoom> chatroomFlux)
{
log.debug("Creating InMemoryChatHomeService");
- this.shardingStrategy = shardingStrategy;
this.chatrooms = new Map[numShards];
Set<Integer> owned = Arrays
.stream(ownedShards)
chatroomFlux
.filter(chatRoom ->
{
- int shard = shardingStrategy.selectShard(chatRoom.getId());
- if (owned.contains(shard))
+ if (owned.contains(chatRoom.getShard()))
{
return true;
}
}
})
.toStream()
- .forEach(chatRoom ->
- {
- getChatRoomMapFor(chatRoom).put(chatRoom.getId(), chatRoom);
- });
+ .forEach(chatroom -> chatrooms[chatroom.getShard()].put(chatroom.getId(), chatroom));
}
@Override
public Mono<ChatRoom> putChatRoom(ChatRoom chatRoom)
{
- getChatRoomMapFor(chatRoom).put(chatRoom.getId(), chatRoom);
+ chatrooms[chatRoom.getShard()].put(chatRoom.getId(), chatRoom);
return Mono.just(chatRoom);
}
{
return Flux.fromStream(chatrooms[shard].values().stream());
}
-
-
- private Map<UUID, ChatRoom> getChatRoomMapFor(ChatRoom chatRoom)
- {
- int shard = shardingStrategy.selectShard(chatRoom.getId());
- return chatrooms[shard];
- }
}
public Mono<ChatRoom> createChatRoom(UUID id, String name)
{
log.info("Creating ChatRoom with buffer-size {}", bufferSize);
+ int shard = shardingStrategy.selectShard(id);
ChatRoomService service = new InMemoryChatRoomService(Flux.empty());
- return Mono.just(new ChatRoom(id, name, clock, service, bufferSize));
+ return Mono.just(new ChatRoom(id, name, shard, clock, service, bufferSize));
}
}
StorageStrategy storageStrategy)
{
int numShards = properties.getInmemory().getNumShards();
- ShardingStrategy shardingStrategy = new KafkaLikeShardingStrategy(numShards);
SimpleChatHome[] chatHomes = new SimpleChatHome[numShards];
storageStrategy
.read()
.subscribe(chatRoom ->
{
- int shard = shardingStrategy.selectShard(chatRoom.getId());
+ int shard = chatRoom.getShard();
if (chatHomes[shard] == null)
chatHomes[shard] = new SimpleChatHome(chatHomeService, shard);
});
- return new ShardedChatHome(chatHomes, shardingStrategy);
+ ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards);
+ return new ShardedChatHome(chatHomes, strategy);
}
@Bean
InMemoryChatHomeService chatHomeService(
ChatBackendProperties properties,
- ShardingStrategy shardingStrategy,
StorageStrategy storageStrategy)
{
ShardingStrategyType sharding =
? new int[] { 0 }
: properties.getInmemory().getOwnedShards();
return new InMemoryChatHomeService(
- shardingStrategy,
numShards,
ownedShards,
storageStrategy.read());
import com.fasterxml.jackson.databind.ObjectMapper;
import de.juplo.kafka.chat.backend.ChatBackendProperties;
+import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomService;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
public StorageStrategy storageStrategy(
ChatBackendProperties properties,
Clock clock,
+ ShardingStrategy shardingStrategy,
ObjectMapper mapper)
{
return new FilesStorageStrategy(
Paths.get(properties.getInmemory().getStorageDirectory()),
clock,
properties.getChatroomBufferSize(),
-
+ shardingStrategy,
messageFlux -> new InMemoryChatRoomService(messageFlux),
mapper);
}
import com.fasterxml.jackson.databind.ObjectMapper;
import de.juplo.kafka.chat.backend.api.ChatRoomTo;
import de.juplo.kafka.chat.backend.api.MessageTo;
+import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
import de.juplo.kafka.chat.backend.domain.ChatRoom;
import de.juplo.kafka.chat.backend.domain.Message;
import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Clock;
+import java.util.UUID;
import static java.nio.file.StandardOpenOption.CREATE;
import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
private final Path storagePath;
private final Clock clock;
private final int bufferSize;
+ private final ShardingStrategy shardingStrategy;
private final ChatRoomServiceFactory factory;
private final ObjectMapper mapper;
return Flux
.from(new JsonFilePublisher<ChatRoomTo>(chatroomsPath(), mapper, type))
.log()
- .map(chatRoomTo -> new ChatRoom(
+ .map(chatRoomTo ->
+ {
+ UUID chatRoomId = chatRoomTo.getId();
+ int shard = shardingStrategy.selectShard(chatRoomId);
+ return new ChatRoom(
chatRoomTo.getId(),
chatRoomTo.getName(),
+ shard,
clock,
factory.create(readMessages(chatRoomTo)),
- bufferSize));
+ bufferSize);
+ });
}
public void writeMessages(ChatRoomTo chatroomTo, Flux<Message> messageFlux)
package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
import de.juplo.kafka.chat.backend.ChatBackendProperties;
+import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomService;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
public StorageStrategy storageStrategy(
ChatRoomRepository chatRoomRepository,
ChatBackendProperties properties,
- Clock clock)
+ Clock clock,
+ ShardingStrategy shardingStrategy)
{
return new MongoDbStorageStrategy(
chatRoomRepository,
clock,
properties.getChatroomBufferSize(),
+ shardingStrategy,
messageFlux -> new InMemoryChatRoomService(messageFlux));
}
}
package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
+import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
import de.juplo.kafka.chat.backend.domain.ChatRoom;
import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
import de.juplo.kafka.chat.backend.persistence.storage.files.ChatRoomServiceFactory;
private final ChatRoomRepository repository;
private final Clock clock;
private final int bufferSize;
+ private final ShardingStrategy shardingStrategy;
private final ChatRoomServiceFactory factory;
.map(chatRoomTo ->
{
UUID chatRoomId = UUID.fromString(chatRoomTo.getId());
+ int shard = shardingStrategy.selectShard(chatRoomId);
return new ChatRoom(
chatRoomId,
chatRoomTo.getName(),
+ shard,
clock,
factory.create(
Flux
ChatRoom chatRoom = new ChatRoom(
chatroomId,
"Test-ChatRoom",
+ 0,
Clock.systemDefaultZone(),
chatRoomService, 8);
when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.just(chatRoom));
ChatRoom chatRoom = new ChatRoom(
chatroomId,
"Test-ChatRoom",
+ 0,
Clock.systemDefaultZone(),
chatRoomService, 8);
when(chatHomeService.getChatRoom(anyInt(), any(UUID.class)))
ChatRoom chatRoom = new ChatRoom(
UUID.randomUUID(),
"Foo",
+ 0,
Clock.systemDefaultZone(),
chatRoomService,
8);
ChatRoom chatRoom = new ChatRoom(
UUID.randomUUID(),
"Foo",
+ 0,
Clock.systemDefaultZone(),
chatRoomService,
8);
ChatRoom chatRoom = new ChatRoom(
UUID.randomUUID(),
"Foo",
+ 0,
Clock.systemDefaultZone(),
chatRoomService,
8);
ChatRoom chatRoom = new ChatRoom(
UUID.randomUUID(),
"Foo",
+ 0,
Clock.systemDefaultZone(),
chatRoomService,
8);
ChatRoom chatRoom = new ChatRoom(
UUID.randomUUID(),
"Foo",
+ 0,
Clock.systemDefaultZone(),
chatRoomService,
8);
ChatRoom chatRoom = new ChatRoom(
UUID.randomUUID(),
"Foo",
+ 0,
Clock.systemDefaultZone(),
mock(ChatRoomService.class),
8);
path,
clock,
8,
+ chatRoomId -> 0,
messageFlux -> new InMemoryChatRoomService(messageFlux),
mapper);
}
protected Supplier<ChatHomeService> getChatHomeServiceSupplier()
{
return () -> new InMemoryChatHomeService(
- chatRoomId -> 0,
1,
new int[] { 0 },
getStorageStrategy().read());
protected Supplier<ChatHomeService> getChatHomeServiceSupplier()
{
return () -> new InMemoryChatHomeService(
- chatRoomId -> 0,
1,
new int[] { 0 },
getStorageStrategy().read());
chatRoomRepository,
clock,
8,
+ chatRoomId -> 0,
messageFlux -> new InMemoryChatRoomService(messageFlux));
}