private final UUID id;
@Getter
private final String name;
- private final PersistenceStrategy persistence;
+ private final ChatroomService chatroomService;
private final int bufferSize;
private Sinks.Many<Message> sink;
public Chatroom(
UUID id,
String name,
- PersistenceStrategy persistence,
+ ChatroomService chatroomService,
int bufferSize)
{
this.id = id;
this.name = name;
- this.persistence = persistence;
+ this.chatroomService = chatroomService;
this.bufferSize = bufferSize;
this.sink = createSink();
}
String user,
String text)
{
- return persistence
+ return chatroomService
.persistMessage(Message.MessageKey.of(user, id), timestamp, text)
.doOnNext(message ->
{
public Mono<Message> getMessage(String username, Long messageId)
{
- return persistence.getMessage(Message.MessageKey.of(username, messageId));
+ Message.MessageKey key = Message.MessageKey.of(username, messageId);
+ return chatroomService.getMessage(key);
}
synchronized public Flux<Message> listen()
public Flux<Message> getMessages(long first, long last)
{
- return persistence.getMessages(first, last);
+ return chatroomService.getMessages(first, last);
}
private Sinks.Many<Message> createSink()
import java.util.UUID;
-public interface ChatroomFactory<Strategy extends PersistenceStrategy>
+public interface ChatroomFactory<T extends ChatroomService>
{
Chatroom createChatroom(UUID id, String name);
}
import java.time.LocalDateTime;
-public interface PersistenceStrategy
+public interface ChatroomService
{
Mono<Message> persistMessage(
Message.MessageKey key,
@RequiredArgsConstructor
-public class InMemoryChatroomFactory implements ChatroomFactory<InMemoryPersistenceStrategy>
+public class InMemoryChatroomFactory implements ChatroomFactory<InMemoryChatroomService>
{
private final int bufferSize;
@Override
public Chatroom createChatroom(UUID id, String name)
{
- InMemoryPersistenceStrategy persistenceStrategy =
- new InMemoryPersistenceStrategy(new LinkedHashMap<>());
- return new Chatroom(id, name, persistenceStrategy, bufferSize);
+ InMemoryChatroomService chatroomService =
+ new InMemoryChatroomService(new LinkedHashMap<>());
+ return new Chatroom(id, name, chatroomService, bufferSize);
}
public Chatroom restoreChatroom(
UUID id,
String name,
- InMemoryPersistenceStrategy persistenceStrategy)
+ InMemoryChatroomService chatroomService)
{
- return new Chatroom(id, name, persistenceStrategy, bufferSize);
+ return new Chatroom(id, name, chatroomService, bufferSize);
}
}
import de.juplo.kafka.chat.backend.domain.Message;
import de.juplo.kafka.chat.backend.domain.MessageMutationException;
-import de.juplo.kafka.chat.backend.domain.PersistenceStrategy;
+import de.juplo.kafka.chat.backend.domain.ChatroomService;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Slf4j
-public class InMemoryPersistenceStrategy implements PersistenceStrategy
+public class InMemoryChatroomService implements ChatroomService
{
private final LinkedHashMap<Message.MessageKey, Message> messages;
- public InMemoryPersistenceStrategy(LinkedHashMap<Message.MessageKey, Message> messages)
+ public InMemoryChatroomService(LinkedHashMap<Message.MessageKey, Message> messages)
{
this.messages = messages;
}
- public InMemoryPersistenceStrategy(Flux<Message> messageFlux)
+ public InMemoryChatroomService(Flux<Message> messageFlux)
{
- log.debug("Creating InMemoryPersistenceStrategy");
+ log.debug("Creating InMemoryChatroomService");
messages = new LinkedHashMap<>();
messageFlux.subscribe(message -> persistMessage(message));
}
.log()
.map(chatroomTo ->
{
- InMemoryPersistenceStrategy strategy =
- new InMemoryPersistenceStrategy(readMessages(chatroomTo));
- return chatroomFactory.restoreChatroom(chatroomTo.getId(), chatroomTo.getName(), strategy);
+ InMemoryChatroomService chatroomService =
+ new InMemoryChatroomService(readMessages(chatroomTo));
+ return chatroomFactory.restoreChatroom(
+ chatroomTo.getId(),
+ chatroomTo.getName(),
+ chatroomService);
});
}