WIP
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / KafkaChatRoomService.java
1 package de.juplo.kafka.chat.backend.persistence.kafka;
2
3 import de.juplo.kafka.chat.backend.domain.ChatRoomService;
4 import de.juplo.kafka.chat.backend.domain.Message;
5 import de.juplo.kafka.chat.backend.domain.MessageMutationException;
6 import lombok.RequiredArgsConstructor;
7 import lombok.extern.slf4j.Slf4j;
8 import org.apache.kafka.clients.producer.Producer;
9 import org.apache.kafka.common.TopicPartition;
10 import reactor.core.publisher.Flux;
11 import reactor.core.publisher.Mono;
12
13 import java.time.LocalDateTime;
14 import java.util.LinkedHashMap;
15
16
17 @Slf4j
18 @RequiredArgsConstructor
19 public class KafkaChatRoomService implements ChatRoomService
20 {
21   private final Producer<String, MessageTo> producer;
22   private final TopicPartition tp;
23
24   private final LinkedHashMap<Message.MessageKey, Message> messages = new LinkedHashMap<>();
25
26   private long offset = 0l;
27
28
29   @Override
30   public Message persistMessage(
31     Message.MessageKey key,
32     LocalDateTime timestamp,
33     String text)
34   {
35     
36     Mono.error(() -> new MessageMutationException(existing, text)));
37     Message message = new Message(key, (long)messages.size(), timestamp, text);
38     messages.put(message.getKey(), message);
39     return message;
40   }
41
42   @Override
43   public Mono<Message> getMessage(Message.MessageKey key)
44   {
45     return Mono.fromSupplier(() -> messages.get(key));
46   }
47
48   @Override
49   public Flux<Message> getMessages(long first, long last)
50   {
51     return Flux.fromStream(messages
52       .values()
53       .stream()
54       .filter(message ->
55       {
56         long serial = message.getSerialNumber();
57         return serial >= first && serial <= last;
58       }));
59   }
60 }