import de.juplo.kafka.chat.backend.domain.ChatHome;
import de.juplo.kafka.chat.backend.domain.ChatRoom;
-import de.juplo.kafka.chat.backend.domain.ChatRoomFactory;
import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
import lombok.RequiredArgsConstructor;
import org.springframework.http.codec.ServerSentEvent;
public class ChatBackendController
{
private final ChatHome chatHome;
- private final ChatRoomFactory factory;
private final StorageStrategy storageStrategy;
public Mono<ChatRoomInfoTo> create(@RequestBody String name)
{
UUID chatRoomId = UUID.randomUUID();
- return factory
+ return chatHome
.createChatRoom(chatRoomId, name)
.map(ChatRoomInfoTo::from);
}
public interface ChatHome
{
+ Mono<ChatRoomInfo> createChatRoom(UUID id, String name);
+
Mono<ChatRoom> getChatRoom(UUID id);
Flux<ChatRoom> getChatRooms();
public ChatRoom(
UUID id,
String name,
- int shard,
+ Integer shard,
Clock clock,
ChatRoomService service,
int bufferSize)
+++ /dev/null
-package de.juplo.kafka.chat.backend.domain;
-
-import reactor.core.publisher.Mono;
-
-import java.util.UUID;
-
-
-public interface ChatRoomFactory
-{
- Mono<ChatRoomInfo> createChatRoom(UUID id, String name);
-}
@Getter
private final String name;
@Getter
- private final int shard;
+ private final Integer shard;
}
+++ /dev/null
-package de.juplo.kafka.chat.backend.persistence.inmemory;
-
-import de.juplo.kafka.chat.backend.domain.*;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
-import java.time.Clock;
-import java.util.UUID;
-
-
-@RequiredArgsConstructor
-@Slf4j
-public class InMemoryChatRoomFactory implements ChatRoomFactory
-{
- private final ShardingStrategy shardingStrategy;
- private final Clock clock;
- private final int bufferSize;
-
-
- @Override
- public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
- {
- log.info("Creating ChatRoom with buffer-size {}", bufferSize);
- int shard = shardingStrategy.selectShard(id);
- ChatRoomService service = new InMemoryChatRoomService(Flux.empty());
- ChatRoom chatRoom = new ChatRoom(id, name, shard, clock, service, bufferSize);
- chatHomeService.putChatRoom(chatRoom);
- return Mono.just(chatRoom);
- }
-}
package de.juplo.kafka.chat.backend.persistence.inmemory;
-import de.juplo.kafka.chat.backend.domain.ChatHome;
-import de.juplo.kafka.chat.backend.domain.ChatRoom;
-import de.juplo.kafka.chat.backend.domain.ShardNotOwnedException;
-import de.juplo.kafka.chat.backend.domain.UnknownChatroomException;
+import de.juplo.kafka.chat.backend.domain.*;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
}
+ @Override
+ public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
+ {
+ int shard = shardingStrategy.selectShard(id);
+ return chatHomes[shard] == null
+ ? Mono.error(new ShardNotOwnedException(shard))
+ : chatHomes[shard].createChatRoom(id, name);
+ }
+
@Override
public Mono<ChatRoom> getChatRoom(UUID id)
{
package de.juplo.kafka.chat.backend.persistence.inmemory;
-import de.juplo.kafka.chat.backend.domain.ChatHome;
-import de.juplo.kafka.chat.backend.domain.ChatRoom;
-import de.juplo.kafka.chat.backend.domain.UnknownChatroomException;
+import de.juplo.kafka.chat.backend.domain.*;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import java.time.Clock;
import java.util.*;
@Slf4j
public class SimpleChatHome implements ChatHome
{
- private final Map<UUID, ChatRoom> chatrooms;
+ private final Integer shard;
+ private final Map<UUID, ChatRoom> chatRooms;
+ private final Clock clock;
+ private final int bufferSize;
- public SimpleChatHome(Flux<ChatRoom> chatroomFlux)
+ public SimpleChatHome(
+ Flux<ChatRoom> chatroomFlux,
+ Clock clock,
+ int bufferSize)
{
- this(chatroomFlux, null);
+ this(null, chatroomFlux, clock, bufferSize);
}
public SimpleChatHome(
Integer shard,
- Flux<ChatRoom> chatroomFlux)
+ Flux<ChatRoom> chatroomFlux,
+ Clock clock,
+ int bufferSize)
{
log.info("Created SimpleChatHome for shard {}", shard);
-
- this.chatrooms = new HashMap<>();
+;
+ this.shard = shard;
+ this.chatRooms = new HashMap<>();
chatroomFlux
.filter(chatRoom ->
{
}
})
.toStream()
- .forEach(chatroom -> chatrooms.put(chatroom.getId(), chatroom));
+ .forEach(chatroom -> chatRooms.put(chatroom.getId(), chatroom));
+ this.clock = clock;
+ this.bufferSize = bufferSize;
}
+ @Override
+ public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
+ {
+ log.info("Creating ChatRoom with buffer-size {}", bufferSize);
+ ChatRoomService service = new InMemoryChatRoomService(Flux.empty());
+ ChatRoom chatRoom = new ChatRoom(id, name, shard, clock, service, bufferSize);
+ chatRooms.put(id, chatRoom);
+ return Mono.just(chatRoom);
+ }
+
@Override
public Mono<ChatRoom> getChatRoom(UUID id)
{
return Mono
- .justOrEmpty(chatrooms.get(id))
+ .justOrEmpty(chatRooms.get(id))
.switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
}
@Override
public Flux<ChatRoom> getChatRooms()
{
- return Flux.fromIterable(chatrooms.values());
+ return Flux.fromIterable(chatRooms.values());
}
}
import de.juplo.kafka.chat.backend.domain.ChatHome;
import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
import de.juplo.kafka.chat.backend.domain.UnknownChatroomException;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
private final ChatRoomChannel chatRoomChannel;
+
+ @Override
+ public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
+ {
+ log.info("Sending create-command for chat rooom: id={}, name={}");
+ return chatRoomChannel.sendCreateChatRoomRequest(id, name);
+ }
+
@Override
public Mono<ChatRoom> getChatRoom(UUID id)
{
+++ /dev/null
-package de.juplo.kafka.chat.backend.persistence.kafka;
-
-import de.juplo.kafka.chat.backend.domain.ChatRoomFactory;
-import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import reactor.core.publisher.Mono;
-
-import java.util.UUID;
-
-
-@RequiredArgsConstructor
-@Slf4j
-public class KafkaChatRoomFactory implements ChatRoomFactory
-{
- private final ChatRoomChannel chatRoomChannel;
-
- @Override
- public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
- {
- log.info("Sending create-command for chat rooom: id={}, name={}");
- return chatRoomChannel.sendCreateChatRoomRequest(id, name);
- }
-}
chatRoomChannel);
}
- @Bean
- KafkaChatRoomFactory chatRoomFactory(ChatRoomChannel chatRoomChannel)
- {
- return new KafkaChatRoomFactory(chatRoomChannel);
- }
-
@Bean
ChatRoomChannel chatRoomChannel(
ChatBackendProperties properties,