WIP wip
authorKai Moritz <kai@juplo.de>
Tue, 24 Jan 2023 17:56:36 +0000 (18:56 +0100)
committerKai Moritz <kai@juplo.de>
Wed, 25 Jan 2023 17:52:12 +0000 (18:52 +0100)
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java

index 024f9aa..981c11f 100644 (file)
@@ -2,7 +2,11 @@ package de.juplo.kafka.chat.backend.persistence.kafka;
 
 import de.juplo.kafka.chat.backend.domain.ChatRoomService;
 import de.juplo.kafka.chat.backend.domain.Message;
+import de.juplo.kafka.chat.backend.domain.MessageMutationException;
+import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.common.TopicPartition;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
@@ -11,17 +15,16 @@ import java.util.LinkedHashMap;
 
 
 @Slf4j
+@RequiredArgsConstructor
 public class KafkaChatRoomService implements ChatRoomService
 {
-  private final LinkedHashMap<Message.MessageKey, Message> messages;
+  private final Producer<String, MessageTo> producer;
+  private final TopicPartition tp;
 
+  private final LinkedHashMap<Message.MessageKey, Message> messages = new LinkedHashMap<>();
+
+  private long offset = 0l;
 
-  public KafkaChatRoomService(Flux<Message> messageFlux)
-  {
-    log.debug("Creating KafkaChatRoomService");
-    messages = new LinkedHashMap<>();
-    messageFlux.subscribe(message -> messages.put(message.getKey(), message));
-  }
 
   @Override
   public Message persistMessage(
@@ -29,6 +32,8 @@ public class KafkaChatRoomService implements ChatRoomService
     LocalDateTime timestamp,
     String text)
   {
+    
+    Mono.error(() -> new MessageMutationException(existing, text)));
     Message message = new Message(key, (long)messages.size(), timestamp, text);
     messages.put(message.getKey(), message);
     return message;