NG
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / KafkaChatRoomService.java
index 981c11f..f802234 100644 (file)
@@ -1,52 +1,50 @@
 package de.juplo.kafka.chat.backend.persistence.kafka;
 
 import de.juplo.kafka.chat.backend.domain.ChatRoomService;
-import de.juplo.kafka.chat.backend.domain.Message;
-import de.juplo.kafka.chat.backend.domain.MessageMutationException;
-import lombok.RequiredArgsConstructor;
+import de.juplo.kafka.chat.backend.domain.Message;import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.common.TopicPartition;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 import java.time.LocalDateTime;
 import java.util.LinkedHashMap;
+import java.util.UUID;
 
 
-@Slf4j
 @RequiredArgsConstructor
+@Slf4j
 public class KafkaChatRoomService implements ChatRoomService
 {
-  private final Producer<String, MessageTo> producer;
-  private final TopicPartition tp;
+  private final ChatMessageChannel chatMessageChannel;
+  private final UUID chatRoomId;
 
   private final LinkedHashMap<Message.MessageKey, Message> messages = new LinkedHashMap<>();
 
-  private long offset = 0l;
-
 
   @Override
-  public Message persistMessage(
+  public Mono<Message> persistMessage(
     Message.MessageKey key,
     LocalDateTime timestamp,
     String text)
   {
-    
-    Mono.error(() -> new MessageMutationException(existing, text)));
-    Message message = new Message(key, (long)messages.size(), timestamp, text);
+    return chatMessageChannel
+        .sendChatMessage(chatRoomId, key, timestamp, text)
+        .doOnSuccess(message -> persistMessage(message));
+  }
+
+  void persistMessage(Message message)
+  {
     messages.put(message.getKey(), message);
-    return message;
   }
 
   @Override
-  public Mono<Message> getMessage(Message.MessageKey key)
+  synchronized public Mono<Message> getMessage(Message.MessageKey key)
   {
     return Mono.fromSupplier(() -> messages.get(key));
   }
 
   @Override
-  public Flux<Message> getMessages(long first, long last)
+  synchronized public Flux<Message> getMessages(long first, long last)
   {
     return Flux.fromStream(messages
       .values()