NEU
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / KafkaChatRoomService.java
index 37c4e50..f036efe 100644 (file)
@@ -5,6 +5,7 @@ import de.juplo.kafka.chat.backend.domain.Message;
 import de.juplo.kafka.chat.backend.domain.MessageMutationException;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.TopicPartition;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -21,8 +22,6 @@ public class KafkaChatRoomService implements ChatRoomService
 
   private final LinkedHashMap<Message.MessageKey, Message> messages = new LinkedHashMap<>();
 
-  private volatile MessageHandlingStrategy strategy;
-
 
   public KafkaChatRoomService(
       Producer<String, MessageTo> producer,
@@ -30,7 +29,6 @@ public class KafkaChatRoomService implements ChatRoomService
   {
     this.producer = producer;
     this.tp = tp;
-    this.strategy = new ChatroomInactiveMessageHandlingStrategy(tp);
   }
 
 
@@ -40,7 +38,36 @@ public class KafkaChatRoomService implements ChatRoomService
     LocalDateTime timestamp,
     String text)
   {
-    return strategy.handleMessage(key, timestamp, text);
+    return Mono.create(sink ->
+    {
+      ProducerRecord<String, MessageTo> record =
+          new ProducerRecord<>(
+              tp.topic(),
+              tp.partition(),
+              timestamp.toEpochSecond(zoneOffset),
+              chatRoomId.toString(),
+              MessageTo.of(key.getUsername(), key.getMessageId(), text));
+
+      producer.send(record, ((metadata, exception) ->
+      {
+        if (metadata != null)
+        {
+          // On successful send
+          {
+            // Emit new message
+            Message message = new Message(key, metadata.offset(), timestamp, text);
+            kafkaChatRoomService.addMessage(message);
+          }
+
+          sink.success();
+        }
+        else
+        {
+          // On send-failure
+          sink.error(exception);
+        }
+      }));
+    });
   }
 
   /**