NEU
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / KafkaChatRoomService.java
index c4737a1..ed155df 100644 (file)
@@ -3,52 +3,34 @@ package de.juplo.kafka.chat.backend.persistence.kafka;
 import de.juplo.kafka.chat.backend.domain.ChatRoomService;
 import de.juplo.kafka.chat.backend.domain.Message;
 import de.juplo.kafka.chat.backend.domain.MessageMutationException;
+import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.clients.producer.ProducerRecord;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 import java.time.LocalDateTime;
-import java.time.ZoneOffset;
 import java.util.LinkedHashMap;
 import java.util.UUID;
 
 
+@RequiredArgsConstructor
 @Slf4j
 public class KafkaChatRoomService implements ChatRoomService
 {
-  private final Producer<String, MessageTo> producer;
-  private final TopicPartition tp;
+  private final KafkaChatHomeService kafkaChatHomeService;
   private final UUID chatRoomId;
-  private final ZoneOffset zoneOffset;
 
   private final LinkedHashMap<Message.MessageKey, Message> messages = new LinkedHashMap<>();
 
-  private volatile MessageHandlingStrategy strategy;
-
-
-  public KafkaChatRoomService(
-      Producer<String, MessageTo> producer,
-      TopicPartition tp,
-      UUID chatRoomId,
-      ZoneOffset zoneOffset)
-  {
-    this.producer = producer;
-    this.tp = tp;
-    this.chatRoomId = chatRoomId;
-    this.zoneOffset = zoneOffset;
-    this.strategy = new ChatroomInactiveMessageHandlingStrategy(tp);
-  }
-
 
   @Override
-  synchronized public Mono<Message> persistMessage(
+  public Mono<Message> persistMessage(
     Message.MessageKey key,
     LocalDateTime timestamp,
     String text)
   {
-    return strategy.handleMessage(key, timestamp, text);
+    return kafkaChatHomeService.sendMessage(chatRoomId, key, timestamp, text);
   }
 
   /**