X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fpersistence%2Fkafka%2FKafkaChatRoomService.java;h=16ed3a7039a2cb176dfbcfe9068b29c518e9c34e;hb=915ed8f85459da3c95f86b6351a3d7129668bc8e;hp=ed155df873a7344d7fe46159bee8b4244b935e2e;hpb=035668bee4f02c4c70f43826026b40f81e3dd672;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java index ed155df8..16ed3a70 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java @@ -1,11 +1,8 @@ package de.juplo.kafka.chat.backend.persistence.kafka; import de.juplo.kafka.chat.backend.domain.ChatRoomService; -import de.juplo.kafka.chat.backend.domain.Message; -import de.juplo.kafka.chat.backend.domain.MessageMutationException; -import lombok.RequiredArgsConstructor; +import de.juplo.kafka.chat.backend.domain.Message;import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.producer.ProducerRecord; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -30,42 +27,14 @@ public class KafkaChatRoomService implements ChatRoomService LocalDateTime timestamp, String text) { - return kafkaChatHomeService.sendMessage(chatRoomId, key, timestamp, text); + return kafkaChatHomeService + .sendMessage(chatRoomId, key, timestamp, text) + .doOnSuccess(message -> persistMessage(message)); } - /** - * {@code synchronized} ist nicht nötig, da Aufruf immer indirekt über - * {@link #persistMessage(Message.MessageKey, LocalDateTime, String)} - */ - protected void addMessage(Message message) throws MessageMutationException + public void persistMessage(Message message) { - Message existingMessage = messages.get(message.getKey()); - - // TODO: Ist der Test nötig, oder wird das durch den Kontrollierten Wechsel - // der Strategie ggf. schon abgefangen? Weil: Wenn es nur um die Sorge geht, - // das eine Nachricht schon "durch den Nutzer" anders geschrieben wurde, - // fängt dies bereits der ChatRoom ab. - // Die Überprüfung hier war vor dem Hintergrund der Sorge hinzugefügt worden, - // dass die Nachricht wegen Verschluckern in Kafka / beim Strategiewechsel / ?? - // doppelt aufschlägt... - if (existingMessage == null) - { - messages.put(message.getKey(), message); - } - else - { - if (!existingMessage.getMessageText().equals(message.getMessageText())) - { - throw new MessageMutationException(existingMessage, message.getMessageText()); - } - - // Warn and emit existing message - log.warn( - "Keeping existing message with {}@{} for {}", - existingMessage.getSerialNumber(), - existingMessage.getTimestamp(), - existingMessage.getKey()); - } + messages.put(message.getKey(), message) } @Override