WIP
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / ChatRoomActiveMessageHandlingStrategy.java
index 69b6fe9..cfd0e4e 100644 (file)
@@ -18,6 +18,7 @@ import java.util.UUID;
 @Log4j
 class ChatRoomActiveMessageHandlingStrategy implements MessageHandlingStrategy
 {
+  private final KafkaChatRoomService kafkaChatRoomService;
   private final Producer<String, MessageTo> producer;
   private final TopicPartition tp;
   private final UUID chatRoomId;
@@ -26,7 +27,7 @@ class ChatRoomActiveMessageHandlingStrategy implements MessageHandlingStrategy
 
 
   @Override
-  public Mono<Message> persistMessage(
+  public Mono<Message> handleMessage(
       Message.MessageKey key,
       LocalDateTime timestamp,
       String text)
@@ -48,8 +49,8 @@ class ChatRoomActiveMessageHandlingStrategy implements MessageHandlingStrategy
           // On successful send
           {
             // Emit new message
-            message = new Message(key, metadata.offset(), timestamp, text);
-            messages.put(message.getKey(), message);
+            Message message = new Message(key, metadata.offset(), timestamp, text);
+            kafkaChatRoomService.addMessage(message);
           }
 
           sink.success();
@@ -62,9 +63,4 @@ class ChatRoomActiveMessageHandlingStrategy implements MessageHandlingStrategy
       }));
     });
   }
-
-  @Override
-  public MessageHandlingStrategy handleMessage(Message message)
-  {
-  }
 }