package de.juplo.kafka.chat.backend.domain;
-import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
{
public final static Pattern VALID_USER = Pattern.compile("^[a-z0-9-]{2,}$");
- @Getter
- private final UUID id;
private final ChatRoomService service;
private final Clock clock;
private final int bufferSize;
public ChatRoomData(
- UUID id,
ChatRoomService service,
Clock clock,
int bufferSize)
{
- log.info("Created ChatRoom {id} with buffer-size {}", id, bufferSize);
- this.id = id;
+ log.info("Created ChatRoom with buffer-size {}", bufferSize);
this.service = service;
this.clock = clock;
this.bufferSize = bufferSize;
ChatHome kafkalikeShardingChatHome(
ChatBackendProperties properties,
StorageStrategy storageStrategy,
+ ChatRoomServiceFactory chatRoomServiceFactory,
Clock clock)
{
int numShards = properties.getInmemory().getNumShards();
.of(properties.getInmemory().getOwnedShards())
.forEach(shard -> chatHomes[shard] = new SimpleChatHome(
shard,
- storageStrategy.readChatRoomData(),
+ storageStrategy,
+ chatRoomServiceFactory,
clock,
properties.getChatroomBufferSize()));
ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards);
chatRoomData.put(
info.getId(),
new ChatRoomData(
- chatRoomId,
chatRoomServiceFactory.create(messageFlux),
clock,
bufferSize));
log.info("Creating ChatRoom with buffer-size {}", bufferSize);
ChatRoomService service = new InMemoryChatRoomService(Flux.empty());
ChatRoomInfo chatRoomInfo = new ChatRoomInfo(id, name, shard);
- ChatRoomData chatRoomData = new ChatRoomData(id, service, clock, bufferSize);
+ ChatRoomData chatRoomData = new ChatRoomData(service, clock, bufferSize);
this.chatRoomData.put(id, chatRoomData);
return Mono.just(chatRoomInfo);
}