problem.setDetail(e.getMessage());
- problem.setProperty("chatroom", e.getChatRoomId());
+ problem.setProperty("chatroom", e.getChatRoomInfo());
return problem;
}
import reactor.core.publisher.Mono;
import java.time.LocalDateTime;
-import java.util.UUID;
public interface ChatMessageService
{
- UUID getChatRoomId();
+ ChatRoomInfo getChatRoomInfo();
Mono<Message> persistMessage(
Message.MessageKey key,
log.warn("Emitting of message failed with {} for {}", result.name(), m);
}
})
- : Mono.error(new ChatRoomInactiveException(service.getChatRoomId())));
+ : Mono.error(new ChatRoomInactiveException(service.getChatRoomInfo())));
}
.asFlux()
.doOnCancel(() -> sink = createSink()) // Sink hast to be recreated on auto-cancel!
: Flux
- .error(new ChatRoomInactiveException(service.getChatRoomId()));
+ .error(new ChatRoomInactiveException(service.getChatRoomInfo()));
}
{
if (active)
{
- log.info("{} is already active!", service.getChatRoomId());
+ log.info("{} is already active!", service.getChatRoomInfo());
return;
}
- log.info("{} is being activated", service.getChatRoomId());
+ log.info("{} is being activated", service.getChatRoomInfo());
this.sink = createSink();
active = true;
}
public void deactivate()
{
- log.info("{} is being deactivated", service.getChatRoomId());
+ log.info("{} is being deactivated", service.getChatRoomInfo());
active = false;
sink.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
}
package de.juplo.kafka.chat.backend.domain.exceptions;
+import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
import lombok.Getter;
-import java.util.UUID;
-
public class ChatRoomInactiveException extends IllegalStateException
{
@Getter
- private final UUID chatRoomId;
+ private final ChatRoomInfo chatRoomInfo;
- public ChatRoomInactiveException(UUID chatRoomId)
+ public ChatRoomInactiveException(ChatRoomInfo chatRoomInfo)
{
- super("Chat-Room " + chatRoomId + " is currently inactive.");
- this.chatRoomId = chatRoomId;
+ super("Chat-Room " + chatRoomInfo + " is currently inactive.");
+ this.chatRoomInfo = chatRoomInfo;
}
}
package de.juplo.kafka.chat.backend.implementation.inmemory;
import de.juplo.kafka.chat.backend.domain.ChatMessageService;
+import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
import de.juplo.kafka.chat.backend.domain.Message;
import de.juplo.kafka.chat.backend.implementation.StorageStrategy;
import lombok.Getter;
public class InMemoryChatMessageService implements ChatMessageService
{
@Getter
- private final UUID chatRoomId;
+ private final ChatRoomInfo chatRoomInfo;
private final LinkedHashMap<Message.MessageKey, Message> messages;
- public InMemoryChatMessageService(UUID chatRoomId)
+ public InMemoryChatMessageService(ChatRoomInfo chatRoomInfo)
{
log.debug("Creating InMemoryChatMessageService");
- this.chatRoomId = chatRoomId;
+ this.chatRoomInfo = chatRoomInfo;
messages = new LinkedHashMap<>();
}
Mono<Void> restore(StorageStrategy storageStrategy)
{
- Flux<Message> messageFlux = storageStrategy.readChatRoomData(chatRoomId);
+ Flux<Message> messageFlux = storageStrategy.readChatRoomData(chatRoomInfo.getId());
return messageFlux
.doOnNext(message -> messages.put(message.getKey(), message))
})
.flatMap(info ->
{
- UUID chatRoomId = info.getId();
InMemoryChatMessageService chatMessageService =
- new InMemoryChatMessageService(chatRoomId);
+ new InMemoryChatMessageService(info);
- chatRoomInfo.put(chatRoomId, info);
+ chatRoomInfo.put(info.getId(), info);
ChatRoomData chatRoomData =
new ChatRoomData(
clock,
public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
{
log.info("Creating ChatRoom with history-limit {}", historyLimit);
- ChatMessageService service = new InMemoryChatMessageService(id);
ChatRoomInfo chatRoomInfo = new ChatRoomInfo(id, name, shard);
+ ChatMessageService service = new InMemoryChatMessageService(chatRoomInfo);
this.chatRoomInfo.put(id, chatRoomInfo);
ChatRoomData chatRoomData = new ChatRoomData(clock, service, historyLimit);
chatRoomData.activate();
else
{
log.info("Creating ChatRoomData {} with history-limit {}", chatRoomId, historyLimit);
- KafkaChatMessageService service = new KafkaChatMessageService(this, chatRoomId);
+ KafkaChatMessageService service = new KafkaChatMessageService(this, chatRoomInfo);
chatRoomData = new ChatRoomData(clock, service, historyLimit);
this.chatRoomData[shard].put(chatRoomId, chatRoomData);
}
package de.juplo.kafka.chat.backend.implementation.kafka;
import de.juplo.kafka.chat.backend.domain.ChatMessageService;
+import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
import de.juplo.kafka.chat.backend.domain.Message;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import java.time.LocalDateTime;
import java.util.LinkedHashMap;
-import java.util.UUID;
@RequiredArgsConstructor
{
private final DataChannel dataChannel;
@Getter
- private final UUID chatRoomId;
+ private final ChatRoomInfo chatRoomInfo;
private final LinkedHashMap<Message.MessageKey, Message> messages = new LinkedHashMap<>();
String text)
{
return dataChannel
- .sendChatMessage(chatRoomId, key, timestamp, text)
+ .sendChatMessage(chatRoomInfo.getId(), key, timestamp, text)
.doOnSuccess(message -> persistMessage(message));
}