import de.juplo.kafka.chat.backend.domain.ChatRoomService;
import de.juplo.kafka.chat.backend.domain.Message;
+import de.juplo.kafka.chat.backend.domain.MessageMutationException;
+import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.common.TopicPartition;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Slf4j
+@RequiredArgsConstructor
public class KafkaChatRoomService implements ChatRoomService
{
- private final LinkedHashMap<Message.MessageKey, Message> messages;
+ private final Producer<String, MessageTo> producer;
+ private final TopicPartition tp;
+ private final LinkedHashMap<Message.MessageKey, Message> messages = new LinkedHashMap<>();
+
+ private long offset = 0l;
- public KafkaChatRoomService(Flux<Message> messageFlux)
- {
- log.debug("Creating KafkaChatRoomService");
- messages = new LinkedHashMap<>();
- messageFlux.subscribe(message -> messages.put(message.getKey(), message));
- }
@Override
public Message persistMessage(
LocalDateTime timestamp,
String text)
{
+
+ Mono.error(() -> new MessageMutationException(existing, text)));
Message message = new Message(key, (long)messages.size(), timestamp, text);
messages.put(message.getKey(), message);
return message;