From 3dcdfa896eaecf9802fdc2af4e2bd4951b0f5f6a Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 15 Apr 2023 11:16:08 +0200 Subject: [PATCH] NEU --- .../kafka/chat/backend/domain/ChatRoom.java | 5 +++ .../kafka/KafkaChatHomeService.java | 12 +++--- .../kafka/KafkaChatRoomService.java | 38 +++---------------- 3 files changed, 17 insertions(+), 38 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java index cffc0ad0..b9463095 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java @@ -80,6 +80,11 @@ public class ChatRoom extends ChatRoomInfo } + public ChatRoomService getChatRoomService() + { + return service; + } + public Mono getMessage(String username, Long messageId) { Message.MessageKey key = Message.MessageKey.of(username, messageId); diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java index 5133d1a6..f105902c 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java @@ -137,11 +137,13 @@ public class KafkaChatHomeService implements ChatHomeService, Runnable, Consumer UUID chatRoomId = UUID.fromString(record.key()); MessageTo messageTo = record.value(); ChatRoom chatRoom = chatRoomMaps[record.partition()].get(chatRoomId); - Mono result = chatRoom.addMessage( - messageTo.getId(), - messageTo.getUser(), - messageTo.getText()); - result.block(). + KafkaChatRoomService kafkaChatRoomService = + (KafkaChatRoomService) chatRoom.getChatRoomService(); + Message.MessageKey key = Message.MessageKey.of(messageTo.getUser(), messageTo.getId()); + Instant instant = Instant.ofEpochSecond(record.timestamp()); + LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId); + Message message = new Message(key, record.offset(), timestamp, messageTo.getText()); + kafkaChatRoomService.persistMessage(message); } } else diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java index ed155df8..3a8c2c6f 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java @@ -30,42 +30,14 @@ public class KafkaChatRoomService implements ChatRoomService LocalDateTime timestamp, String text) { - return kafkaChatHomeService.sendMessage(chatRoomId, key, timestamp, text); + return kafkaChatHomeService + .sendMessage(chatRoomId, key, timestamp, text) + .doOnSuccess(message -> persistMessage(message)); } - /** - * {@code synchronized} ist nicht nötig, da Aufruf immer indirekt über - * {@link #persistMessage(Message.MessageKey, LocalDateTime, String)} - */ - protected void addMessage(Message message) throws MessageMutationException + public void persistMessage(Message message) { - Message existingMessage = messages.get(message.getKey()); - - // TODO: Ist der Test nötig, oder wird das durch den Kontrollierten Wechsel - // der Strategie ggf. schon abgefangen? Weil: Wenn es nur um die Sorge geht, - // das eine Nachricht schon "durch den Nutzer" anders geschrieben wurde, - // fängt dies bereits der ChatRoom ab. - // Die Überprüfung hier war vor dem Hintergrund der Sorge hinzugefügt worden, - // dass die Nachricht wegen Verschluckern in Kafka / beim Strategiewechsel / ?? - // doppelt aufschlägt... - if (existingMessage == null) - { - messages.put(message.getKey(), message); - } - else - { - if (!existingMessage.getMessageText().equals(message.getMessageText())) - { - throw new MessageMutationException(existingMessage, message.getMessageText()); - } - - // Warn and emit existing message - log.warn( - "Keeping existing message with {}@{} for {}", - existingMessage.getSerialNumber(), - existingMessage.getTimestamp(), - existingMessage.getKey()); - } + messages.put(message.getKey(), message) } @Override -- 2.20.1