package de.juplo.kafka.chat.backend.api;
import de.juplo.kafka.chat.backend.domain.ChatHome;
-import de.juplo.kafka.chat.backend.domain.Chatroom;
+import de.juplo.kafka.chat.backend.domain.ChatRoom;
import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
import lombok.RequiredArgsConstructor;
-import org.springframework.http.MediaType;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
}
public Mono<MessageTo> put(
- Chatroom chatroom,
+ ChatRoom chatroom,
String username,
Long messageId,
String text)
}
private Mono<MessageTo> get(
- Chatroom chatroom,
+ ChatRoom chatroom,
String username,
Long messageId)
{
.orElseThrow(() -> new UnknownChatroomException(chatroomId));
}
- private Flux<ServerSentEvent<MessageTo>> listen(Chatroom chatroom)
+ private Flux<ServerSentEvent<MessageTo>> listen(ChatRoom chatroom)
{
return chatroom
.listen()
package de.juplo.kafka.chat.backend.api;
-import de.juplo.kafka.chat.backend.domain.Chatroom;
+import de.juplo.kafka.chat.backend.domain.ChatRoom;
import lombok.Data;
import java.util.UUID;
private String name;
- public static ChatroomTo from(Chatroom chatroom)
+ public static ChatroomTo from(ChatRoom chatroom)
{
ChatroomTo info = new ChatroomTo();
info.id = chatroom.getId();
@Slf4j
public class ChatHome
{
- private final Map<UUID, Chatroom> chatrooms;
+ private final Map<UUID, ChatRoom> chatrooms;
private final ChatHomeService service;
- public ChatHome(ChatHomeService service, Flux<Chatroom> chatroomFlux)
+ public ChatHome(ChatHomeService service, Flux<ChatRoom> chatroomFlux)
{
log.debug("Creating ChatHome with factory: {}", service);
this.service = service;
chatroomFlux.subscribe(chatroom -> chatrooms.put(chatroom.getId(), chatroom));
}
- public Chatroom createChatroom(String name)
+ public ChatRoom createChatroom(String name)
{
- Chatroom chatroom = service.createChatroom(UUID.randomUUID(), name);
+ ChatRoom chatroom = service.createChatroom(UUID.randomUUID(), name);
chatrooms.put(chatroom.getId(), chatroom);
return chatroom;
}
- public Optional<Chatroom> getChatroom(UUID id)
+ public Optional<ChatRoom> getChatroom(UUID id)
{
return Optional.ofNullable(chatrooms.get(id));
}
- public Stream<Chatroom> list()
+ public Stream<ChatRoom> list()
{
return chatrooms.values().stream();
}
public interface ChatHomeService<T extends ChatroomService>
{
- Chatroom createChatroom(UUID id, String name);
+ ChatRoom createChatroom(UUID id, String name);
}
--- /dev/null
+package de.juplo.kafka.chat.backend.domain;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.publisher.Sinks;
+
+import java.time.LocalDateTime;
+import java.util.*;
+
+
+@Slf4j
+public class ChatRoom
+{
+ @Getter
+ private final UUID id;
+ @Getter
+ private final String name;
+ private final ChatroomService chatroomService;
+ private final int bufferSize;
+ private Sinks.Many<Message> sink;
+
+ public ChatRoom(
+ UUID id,
+ String name,
+ ChatroomService chatroomService,
+ int bufferSize)
+ {
+ this.id = id;
+ this.name = name;
+ this.chatroomService = chatroomService;
+ this.bufferSize = bufferSize;
+ this.sink = createSink();
+ }
+
+
+ synchronized public Mono<Message> addMessage(
+ Long id,
+ LocalDateTime timestamp,
+ String user,
+ String text)
+ {
+ return chatroomService
+ .persistMessage(Message.MessageKey.of(user, id), timestamp, text)
+ .doOnNext(message ->
+ {
+ Sinks.EmitResult result = sink.tryEmitNext(message);
+ if (result.isFailure())
+ {
+ log.warn("Emitting of message failed with {} for {}", result.name(), message);
+ }
+ });
+ }
+
+
+ public Mono<Message> getMessage(String username, Long messageId)
+ {
+ Message.MessageKey key = Message.MessageKey.of(username, messageId);
+ return chatroomService.getMessage(key);
+ }
+
+ synchronized public Flux<Message> listen()
+ {
+ return sink
+ .asFlux()
+ .doOnCancel(() -> sink = createSink()); // Sink hast to be recreated on auto-cancel!
+ }
+
+ public Flux<Message> getMessages()
+ {
+ return getMessages(0, Long.MAX_VALUE);
+ }
+
+ public Flux<Message> getMessages(long first, long last)
+ {
+ return chatroomService.getMessages(first, last);
+ }
+
+ private Sinks.Many<Message> createSink()
+ {
+ return Sinks
+ .many()
+ .multicast()
+ .onBackpressureBuffer(bufferSize);
+ }
+}
+++ /dev/null
-package de.juplo.kafka.chat.backend.domain;
-
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-import reactor.core.publisher.Sinks;
-
-import java.time.LocalDateTime;
-import java.util.*;
-
-
-@Slf4j
-public class Chatroom
-{
- @Getter
- private final UUID id;
- @Getter
- private final String name;
- private final ChatroomService chatroomService;
- private final int bufferSize;
- private Sinks.Many<Message> sink;
-
- public Chatroom(
- UUID id,
- String name,
- ChatroomService chatroomService,
- int bufferSize)
- {
- this.id = id;
- this.name = name;
- this.chatroomService = chatroomService;
- this.bufferSize = bufferSize;
- this.sink = createSink();
- }
-
-
- synchronized public Mono<Message> addMessage(
- Long id,
- LocalDateTime timestamp,
- String user,
- String text)
- {
- return chatroomService
- .persistMessage(Message.MessageKey.of(user, id), timestamp, text)
- .doOnNext(message ->
- {
- Sinks.EmitResult result = sink.tryEmitNext(message);
- if (result.isFailure())
- {
- log.warn("Emitting of message failed with {} for {}", result.name(), message);
- }
- });
- }
-
-
- public Mono<Message> getMessage(String username, Long messageId)
- {
- Message.MessageKey key = Message.MessageKey.of(username, messageId);
- return chatroomService.getMessage(key);
- }
-
- synchronized public Flux<Message> listen()
- {
- return sink
- .asFlux()
- .doOnCancel(() -> sink = createSink()); // Sink hast to be recreated on auto-cancel!
- }
-
- public Flux<Message> getMessages()
- {
- return getMessages(0, Long.MAX_VALUE);
- }
-
- public Flux<Message> getMessages(long first, long last)
- {
- return chatroomService.getMessages(first, last);
- }
-
- private Sinks.Many<Message> createSink()
- {
- return Sinks
- .many()
- .multicast()
- .onBackpressureBuffer(bufferSize);
- }
-}
package de.juplo.kafka.chat.backend.persistence;
-import de.juplo.kafka.chat.backend.domain.Chatroom;
+import de.juplo.kafka.chat.backend.domain.ChatRoom;
import de.juplo.kafka.chat.backend.domain.ChatHomeService;
import lombok.RequiredArgsConstructor;
@Override
- public Chatroom createChatroom(UUID id, String name)
+ public ChatRoom createChatroom(UUID id, String name)
{
InMemoryChatroomService chatroomService =
new InMemoryChatroomService(new LinkedHashMap<>());
- return new Chatroom(id, name, chatroomService, bufferSize);
+ return new ChatRoom(id, name, chatroomService, bufferSize);
}
- public Chatroom restoreChatroom(
+ public ChatRoom restoreChatroom(
UUID id,
String name,
InMemoryChatroomService chatroomService)
{
- return new Chatroom(id, name, chatroomService, bufferSize);
+ return new ChatRoom(id, name, chatroomService, bufferSize);
}
}
import com.fasterxml.jackson.databind.ObjectMapper;
import de.juplo.kafka.chat.backend.api.ChatroomTo;
import de.juplo.kafka.chat.backend.api.MessageTo;
-import de.juplo.kafka.chat.backend.domain.Chatroom;
+import de.juplo.kafka.chat.backend.domain.ChatRoom;
import de.juplo.kafka.chat.backend.domain.Message;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@Override
- public void writeChatrooms(Flux<Chatroom> chatroomFlux)
+ public void writeChatrooms(Flux<ChatRoom> chatroomFlux)
{
Path path = chatroomsPath();
log.info("Writing chatrooms to {}", path);
}
@Override
- public Flux<Chatroom> readChatrooms()
+ public Flux<ChatRoom> readChatrooms()
{
JavaType type = mapper.getTypeFactory().constructType(ChatroomTo.class);
return Flux
package de.juplo.kafka.chat.backend.persistence;
import de.juplo.kafka.chat.backend.api.ChatroomTo;
-import de.juplo.kafka.chat.backend.domain.Chatroom;
+import de.juplo.kafka.chat.backend.domain.ChatRoom;
import de.juplo.kafka.chat.backend.domain.Message;
import reactor.core.publisher.Flux;
public interface StorageStrategy
{
- void writeChatrooms(Flux<Chatroom> chatroomFlux);
- Flux<Chatroom> readChatrooms();
+ void writeChatrooms(Flux<ChatRoom> chatroomFlux);
+ Flux<ChatRoom> readChatrooms();
void writeMessages(ChatroomTo chatroomTo, Flux<Message> messageFlux);
Flux<Message> readMessages(ChatroomTo chatroomTo);
}