WIP
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / ChatRoomActiveMessageHandlingStrategy.java
index 69b6fe9..49ace7a 100644 (file)
@@ -18,6 +18,7 @@ import java.util.UUID;
 @Log4j
 class ChatRoomActiveMessageHandlingStrategy implements MessageHandlingStrategy
 {
+  private final KafkaChatRoomService kafkaChatRoomService;
   private final Producer<String, MessageTo> producer;
   private final TopicPartition tp;
   private final UUID chatRoomId;
@@ -48,8 +49,8 @@ class ChatRoomActiveMessageHandlingStrategy implements MessageHandlingStrategy
           // On successful send
           {
             // Emit new message
-            message = new Message(key, metadata.offset(), timestamp, text);
-            messages.put(message.getKey(), message);
+            Message message = new Message(key, metadata.offset(), timestamp, text);
+            kafkaChatRoomService.addMessage(message);
           }
 
           sink.success();