@Slf4j
public class KafkaChatRoomService implements ChatRoomService
{
- private final KafkaChatHomeService kafkaChatHomeService;
+ private final ChatMessageChannel chatMessageChannel;
private final UUID chatRoomId;
private final LinkedHashMap<Message.MessageKey, Message> messages = new LinkedHashMap<>();
LocalDateTime timestamp,
String text)
{
- return kafkaChatHomeService
+ return chatMessageChannel
.sendMessage(chatRoomId, key, timestamp, text)
.doOnSuccess(message -> persistMessage(message));
}
- public void persistMessage(Message message)
+ void persistMessage(Message message)
{
- messages.put(message.getKey(), message)
+ messages.put(message.getKey(), message);
}
@Override
synchronized public Mono<Message> getMessage(Message.MessageKey key)
{
- // TODO: Aufrufe, auf eine Nachricht (einge gewisse Zeit) warten lassen
- // und dann bedienen, wenn der der Callback vom Producer aufgerufen wird?
return Mono.fromSupplier(() -> messages.get(key));
}