X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fpersistence%2Fkafka%2FKafkaChatRoomService.java;h=3a8c2c6fba81715a2a3f4884f400ef5c7a8897e8;hb=3dcdfa896eaecf9802fdc2af4e2bd4951b0f5f6a;hp=4e7eb369b9d8be01a10d96ab97865949e97ebb25;hpb=b4bdcb0fca725eeb32b0215240aa9db18809f7d6;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java index 4e7eb369..3a8c2c6f 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java @@ -3,45 +3,26 @@ package de.juplo.kafka.chat.backend.persistence.kafka; import de.juplo.kafka.chat.backend.domain.ChatRoomService; import de.juplo.kafka.chat.backend.domain.Message; import de.juplo.kafka.chat.backend.domain.MessageMutationException; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.TopicPartition; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.time.LocalDateTime; -import java.time.ZoneOffset; import java.util.LinkedHashMap; import java.util.UUID; +@RequiredArgsConstructor @Slf4j public class KafkaChatRoomService implements ChatRoomService { - private final Producer producer; - private final TopicPartition tp; + private final KafkaChatHomeService kafkaChatHomeService; private final UUID chatRoomId; - private final ZoneOffset zoneOffset; private final LinkedHashMap messages = new LinkedHashMap<>(); - private MessageHandlingStrategy strategy; - - - public KafkaChatRoomService( - Producer producer, - TopicPartition tp, - UUID chatRoomId, - ZoneOffset zoneOffset) - { - this.producer = producer; - this.tp = tp; - this.chatRoomId = chatRoomId; - this.zoneOffset = zoneOffset; - this.strategy = new ChatroomInactiveMessageHandlingStrategy(tp); - } - @Override public Mono persistMessage( @@ -49,36 +30,21 @@ public class KafkaChatRoomService implements ChatRoomService LocalDateTime timestamp, String text) { - return strategy.persistMessage(key, timestamp, text); + return kafkaChatHomeService + .sendMessage(chatRoomId, key, timestamp, text) + .doOnSuccess(message -> persistMessage(message)); } - synchronized protected void addMessage(Message message) throws MessageMutationException + public void persistMessage(Message message) { - Message existingMessage = messages.get(message.getKey()); - - if (existingMessage == null) - { - messages.put(existingMessage.getKey(), existingMessage); - } - else - { - if (!existingMessage.getMessageText().equals(message.getMessageText())) - { - throw new MessageMutationException(existingMessage, message.getMessageText()); - } - - // Warn and emit existing message - log.warn( - "Keeping existing message with {}@{} for {}", - existingMessage.getSerialNumber(), - existingMessage.getTimestamp(), - existingMessage.getKey()); - } + messages.put(message.getKey(), message) } @Override synchronized public Mono getMessage(Message.MessageKey key) { + // TODO: Aufrufe, auf eine Nachricht (einge gewisse Zeit) warten lassen + // und dann bedienen, wenn der der Callback vom Producer aufgerufen wird? return Mono.fromSupplier(() -> messages.get(key)); }