package de.juplo.kafka.chat.backend.api;
import de.juplo.kafka.chat.backend.domain.Chatroom;
+import de.juplo.kafka.chat.backend.domain.PersistenceStrategy;
import lombok.RequiredArgsConstructor;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
public class ChatBackendController
{
private final Map<UUID, Chatroom> chatrooms = new HashMap<>();
+ private final PersistenceStrategy persistenceStrategy;
private final Clock clock;
@PostMapping("create")
public Chatroom create(@RequestBody String name)
{
- Chatroom chatroom = new Chatroom(UUID.randomUUID(), name);
+ Chatroom chatroom = new Chatroom(UUID.randomUUID(), name, persistenceStrategy);
chatrooms.put(chatroom.getId(), chatroom);
return chatroom;
}
private final UUID id;
@Getter
private final String name;
- private final LinkedHashMap<Message.MessageKey, Message> messages = new LinkedHashMap<>();
+ private final PersistenceStrategy persistence;
private final Sinks.Many<Message> sink = Sinks.many().multicast().onBackpressureBuffer();
synchronized public Mono<Message> addMessage(
String user,
String text)
{
- return persistMessage(id, timestamp, user, text)
+ return persistence
+ .persistMessage(Message.MessageKey.of(user, id), timestamp, text)
.doOnNext(message -> sink.tryEmitNext(message).orThrow());
}
- private Mono<Message> persistMessage(
- Long id,
- LocalDateTime timestamp,
- String user,
- String text)
- {
- Message.MessageKey key = Message.MessageKey.of(user, id);
- Message message = new Message(key, (long)messages.size(), timestamp, text);
-
- Message existing = messages.get(key);
- if (existing != null)
- {
- log.info("Message with key {} already exists; {}", key, existing);
- if (!message.equals(existing))
- throw new MessageMutationException(message, existing);
- return Mono.empty();
- }
-
- messages.put(key, message);
- return Mono
- .fromSupplier(() -> message)
- .log();
- }
public Mono<Message> getMessage(String username, Long messageId)
{
- return Mono.fromSupplier(() ->
- {
- Message.MessageKey key = Message.MessageKey.of(username, messageId);
- return messages.get(key);
- });
+ return persistence.getMessage(Message.MessageKey.of(username, messageId));
}
public Flux<Message> listen()
public Flux<Message> getMessages(long first, long last)
{
- return Flux.fromStream(messages
- .values()
- .stream()
- .filter(message ->
- {
- long serial = message.getSerialNumber();
- return serial >= first && serial <= last;
- }));
+ return persistence.getMessages(first, last);
}
}
--- /dev/null
+package de.juplo.kafka.chat.backend.domain;
+
+import lombok.Value;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.time.LocalDateTime;
+
+
+public interface PersistenceStrategy
+{
+ Mono<Message> persistMessage(
+ Message.MessageKey key,
+ LocalDateTime timestamp,
+ String text);
+
+ Mono<Message> getMessage(Message.MessageKey key);
+
+ Flux<Message> getMessages(long first, long last);
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence;
+
+import de.juplo.kafka.chat.backend.domain.Message;
+import de.juplo.kafka.chat.backend.domain.MessageMutationException;
+import de.juplo.kafka.chat.backend.domain.PersistenceStrategy;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.Value;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.publisher.Sinks;
+
+import java.time.LocalDateTime;
+import java.util.LinkedHashMap;
+import java.util.UUID;
+import java.util.stream.Stream;
+
+
+@Component
+@RequiredArgsConstructor
+@Slf4j
+public class InMemoryPersistenceStrategy implements PersistenceStrategy
+{
+ private final LinkedHashMap<Message.MessageKey, Message> messages = new LinkedHashMap<>();
+
+ @Override
+ public Mono<Message> persistMessage(
+ Message.MessageKey key,
+ LocalDateTime timestamp,
+ String text)
+ {
+ Message message = new Message(key, (long)messages.size(), timestamp, text);
+
+ Message existing = messages.get(key);
+ if (existing != null)
+ {
+ log.info("Message with key {} already exists; {}", key, existing);
+ if (!message.equals(existing))
+ throw new MessageMutationException(message, existing);
+ return Mono.empty();
+ }
+
+ messages.put(key, message);
+ return Mono
+ .fromSupplier(() -> message)
+ .log();
+ }
+
+ @Override
+ public Mono<Message> getMessage(Message.MessageKey key)
+ {
+ return Mono.fromSupplier(() -> messages.get(key));
+ }
+
+ @Override
+ public Flux<Message> getMessages(long first, long last)
+ {
+ return Flux.fromStream(messages
+ .values()
+ .stream()
+ .filter(message ->
+ {
+ long serial = message.getSerialNumber();
+ return serial >= first && serial <= last;
+ }));
+ }
+}