X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fpersistence%2Fkafka%2FKafkaChatRoomService.java;h=16ed3a7039a2cb176dfbcfe9068b29c518e9c34e;hb=915ed8f85459da3c95f86b6351a3d7129668bc8e;hp=4e7eb369b9d8be01a10d96ab97865949e97ebb25;hpb=b4bdcb0fca725eeb32b0215240aa9db18809f7d6;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java index 4e7eb369..16ed3a70 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java @@ -1,47 +1,25 @@ package de.juplo.kafka.chat.backend.persistence.kafka; import de.juplo.kafka.chat.backend.domain.ChatRoomService; -import de.juplo.kafka.chat.backend.domain.Message; -import de.juplo.kafka.chat.backend.domain.MessageMutationException; +import de.juplo.kafka.chat.backend.domain.Message;import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.TopicPartition; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.time.LocalDateTime; -import java.time.ZoneOffset; import java.util.LinkedHashMap; import java.util.UUID; +@RequiredArgsConstructor @Slf4j public class KafkaChatRoomService implements ChatRoomService { - private final Producer producer; - private final TopicPartition tp; + private final KafkaChatHomeService kafkaChatHomeService; private final UUID chatRoomId; - private final ZoneOffset zoneOffset; private final LinkedHashMap messages = new LinkedHashMap<>(); - private MessageHandlingStrategy strategy; - - - public KafkaChatRoomService( - Producer producer, - TopicPartition tp, - UUID chatRoomId, - ZoneOffset zoneOffset) - { - this.producer = producer; - this.tp = tp; - this.chatRoomId = chatRoomId; - this.zoneOffset = zoneOffset; - this.strategy = new ChatroomInactiveMessageHandlingStrategy(tp); - } - @Override public Mono persistMessage( @@ -49,36 +27,21 @@ public class KafkaChatRoomService implements ChatRoomService LocalDateTime timestamp, String text) { - return strategy.persistMessage(key, timestamp, text); + return kafkaChatHomeService + .sendMessage(chatRoomId, key, timestamp, text) + .doOnSuccess(message -> persistMessage(message)); } - synchronized protected void addMessage(Message message) throws MessageMutationException + public void persistMessage(Message message) { - Message existingMessage = messages.get(message.getKey()); - - if (existingMessage == null) - { - messages.put(existingMessage.getKey(), existingMessage); - } - else - { - if (!existingMessage.getMessageText().equals(message.getMessageText())) - { - throw new MessageMutationException(existingMessage, message.getMessageText()); - } - - // Warn and emit existing message - log.warn( - "Keeping existing message with {}@{} for {}", - existingMessage.getSerialNumber(), - existingMessage.getTimestamp(), - existingMessage.getKey()); - } + messages.put(message.getKey(), message) } @Override synchronized public Mono getMessage(Message.MessageKey key) { + // TODO: Aufrufe, auf eine Nachricht (einge gewisse Zeit) warten lassen + // und dann bedienen, wenn der der Callback vom Producer aufgerufen wird? return Mono.fromSupplier(() -> messages.get(key)); }