import reactor.core.publisher.Mono;
import java.time.Clock;
-import java.time.Duration;
import java.time.LocalDateTime;
-import java.time.LocalTime;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
return chatrooms.get(chatroomId);
}
- @PutMapping("post/{chatroomId}/{username}/{messageId}")
- public Mono<MessageTo> post(
+ @PutMapping("put/{chatroomId}/{username}/{messageId}")
+ public Mono<MessageTo> put(
@PathVariable UUID chatroomId,
@PathVariable String username,
- @PathVariable UUID messageId,
+ @PathVariable Long messageId,
@RequestBody String text)
{
+ Chatroom chatroom = chatrooms.get(chatroomId);
return
- chatrooms
- .get(chatroomId)
+ chatroom
.addMessage(
messageId,
LocalDateTime.now(clock),
username,
text)
+ .switchIfEmpty(chatroom.getMessage(username, messageId))
+ .map(message -> MessageTo.from(message));
+ }
+
+ @GetMapping("get/{chatroomId}/{username}/{messageId}")
+ public Mono<MessageTo> get(
+ @PathVariable UUID chatroomId,
+ @PathVariable String username,
+ @PathVariable Long messageId)
+ {
+ return
+ chatrooms
+ .get(chatroomId)
+ .getMessage(username, messageId)
.map(message -> MessageTo.from(message));
}
import lombok.Getter;
import lombok.RequiredArgsConstructor;
+import lombok.Value;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import java.time.LocalDateTime;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.UUID;
+import java.util.*;
import java.util.stream.Stream;
private final UUID id;
@Getter
private final String name;
- private final List<Message> messages = new LinkedList<>();
+ private final LinkedHashMap<MessageKey, Message> messages = new LinkedHashMap<>();
private final Sinks.Many<Message> sink = Sinks.many().multicast().onBackpressureBuffer();
synchronized public Mono<Message> addMessage(
- UUID id,
+ Long id,
LocalDateTime timestamp,
String user,
String text)
}
private Mono<Message> persistMessage(
- UUID id,
+ Long id,
LocalDateTime timestamp,
String user,
String text)
{
Message message = new Message(id, (long)messages.size(), timestamp, user, text);
- messages.add(message);
+
+ MessageKey key = new MessageKey(user, id);
+ Message existing = messages.get(key);
+ if (existing != null)
+ {
+ log.info("Message with key {} already exists; {}", key, existing);
+ if (!message.equals(existing))
+ throw new IllegalArgumentException("Messages are imutable!");
+ return Mono.empty();
+ }
+
+ messages.put(key, message);
return Mono
.fromSupplier(() -> message)
.log();
}
+ public Mono<Message> getMessage(String username, Long messageId)
+ {
+ return Mono.fromSupplier(() ->
+ {
+ MessageKey key = MessageKey.of(username, messageId);
+ return messages.get(key);
+ });
+ }
+
public Flux<Message> listen()
{
return sink.asFlux();
public Stream<Message> getMessages(long firstMessage)
{
- return messages.stream().filter(message -> message.getSerialNumber() >= firstMessage);
+ return messages
+ .values()
+ .stream()
+ .filter(message -> message.getSerialNumber() >= firstMessage);
+ }
+
+
+ @Value(staticConstructor = "of")
+ static class MessageKey
+ {
+ String username;
+ Long messageId;
}
}