From: Kai Moritz Date: Sat, 15 Apr 2023 09:16:08 +0000 (+0200) Subject: NEU X-Git-Tag: kafkadata~24 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=3dcdfa896eaecf9802fdc2af4e2bd4951b0f5f6a;p=demos%2Fkafka%2Fchat NEU --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java index cffc0ad0..b9463095 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java @@ -80,6 +80,11 @@ public class ChatRoom extends ChatRoomInfo } + public ChatRoomService getChatRoomService() + { + return service; + } + public Mono getMessage(String username, Long messageId) { Message.MessageKey key = Message.MessageKey.of(username, messageId); diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java index 5133d1a6..f105902c 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java @@ -137,11 +137,13 @@ public class KafkaChatHomeService implements ChatHomeService, Runnable, Consumer UUID chatRoomId = UUID.fromString(record.key()); MessageTo messageTo = record.value(); ChatRoom chatRoom = chatRoomMaps[record.partition()].get(chatRoomId); - Mono result = chatRoom.addMessage( - messageTo.getId(), - messageTo.getUser(), - messageTo.getText()); - result.block(). + KafkaChatRoomService kafkaChatRoomService = + (KafkaChatRoomService) chatRoom.getChatRoomService(); + Message.MessageKey key = Message.MessageKey.of(messageTo.getUser(), messageTo.getId()); + Instant instant = Instant.ofEpochSecond(record.timestamp()); + LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId); + Message message = new Message(key, record.offset(), timestamp, messageTo.getText()); + kafkaChatRoomService.persistMessage(message); } } else diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java index ed155df8..3a8c2c6f 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java @@ -30,42 +30,14 @@ public class KafkaChatRoomService implements ChatRoomService LocalDateTime timestamp, String text) { - return kafkaChatHomeService.sendMessage(chatRoomId, key, timestamp, text); + return kafkaChatHomeService + .sendMessage(chatRoomId, key, timestamp, text) + .doOnSuccess(message -> persistMessage(message)); } - /** - * {@code synchronized} ist nicht nötig, da Aufruf immer indirekt über - * {@link #persistMessage(Message.MessageKey, LocalDateTime, String)} - */ - protected void addMessage(Message message) throws MessageMutationException + public void persistMessage(Message message) { - Message existingMessage = messages.get(message.getKey()); - - // TODO: Ist der Test nötig, oder wird das durch den Kontrollierten Wechsel - // der Strategie ggf. schon abgefangen? Weil: Wenn es nur um die Sorge geht, - // das eine Nachricht schon "durch den Nutzer" anders geschrieben wurde, - // fängt dies bereits der ChatRoom ab. - // Die Überprüfung hier war vor dem Hintergrund der Sorge hinzugefügt worden, - // dass die Nachricht wegen Verschluckern in Kafka / beim Strategiewechsel / ?? - // doppelt aufschlägt... - if (existingMessage == null) - { - messages.put(message.getKey(), message); - } - else - { - if (!existingMessage.getMessageText().equals(message.getMessageText())) - { - throw new MessageMutationException(existingMessage, message.getMessageText()); - } - - // Warn and emit existing message - log.warn( - "Keeping existing message with {}@{} for {}", - existingMessage.getSerialNumber(), - existingMessage.getTimestamp(), - existingMessage.getKey()); - } + messages.put(message.getKey(), message) } @Override