WIP:compiles!
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / ChatRoomActiveMessageHandlingStrategy.java
index 69b6fe9..04564b9 100644 (file)
@@ -1,9 +1,8 @@
 package de.juplo.kafka.chat.backend.persistence.kafka;
 
 import de.juplo.kafka.chat.backend.domain.Message;
-import de.juplo.kafka.chat.backend.domain.MessageMutationException;
 import lombok.RequiredArgsConstructor;
-import lombok.extern.log4j.Log4j;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.TopicPartition;
@@ -15,9 +14,10 @@ import java.util.UUID;
 
 
 @RequiredArgsConstructor
-@Log4j
+@Slf4j
 class ChatRoomActiveMessageHandlingStrategy implements MessageHandlingStrategy
 {
+  private final KafkaChatRoomService kafkaChatRoomService;
   private final Producer<String, MessageTo> producer;
   private final TopicPartition tp;
   private final UUID chatRoomId;
@@ -26,7 +26,7 @@ class ChatRoomActiveMessageHandlingStrategy implements MessageHandlingStrategy
 
 
   @Override
-  public Mono<Message> persistMessage(
+  public Mono<Message> handleMessage(
       Message.MessageKey key,
       LocalDateTime timestamp,
       String text)
@@ -48,8 +48,8 @@ class ChatRoomActiveMessageHandlingStrategy implements MessageHandlingStrategy
           // On successful send
           {
             // Emit new message
-            message = new Message(key, metadata.offset(), timestamp, text);
-            messages.put(message.getKey(), message);
+            Message message = new Message(key, metadata.offset(), timestamp, text);
+            kafkaChatRoomService.addMessage(message);
           }
 
           sink.success();
@@ -62,9 +62,4 @@ class ChatRoomActiveMessageHandlingStrategy implements MessageHandlingStrategy
       }));
     });
   }
-
-  @Override
-  public MessageHandlingStrategy handleMessage(Message message)
-  {
-  }
 }