WIP
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / ChatHomeLoader.java
1 package de.juplo.kafka.chat.backend.persistence.kafka;
2
3 import de.juplo.kafka.chat.backend.domain.ChatRoom;
4 import de.juplo.kafka.chat.backend.domain.Message;
5 import lombok.RequiredArgsConstructor;
6 import lombok.extern.slf4j.Slf4j;
7 import org.apache.kafka.clients.consumer.ConsumerRecord;
8
9 import java.time.Instant;
10 import java.time.LocalDateTime;
11 import java.time.ZoneId;
12 import java.util.HashMap;
13 import java.util.Map;
14 import java.util.UUID;
15
16
17 @RequiredArgsConstructor
18 @Slf4j
19 class ChatHomeLoader
20 {
21   private final long offsetOfFirstNewMessage;
22   private final ZoneId zoneId;
23   private final Map<UUID, KafkaChatRoomService> kafkaChatRoomServiceMap = new HashMap<>();
24
25
26   /**
27    * Rebuilds the state of the {@link KafkaChatHomeService} by consuming
28    * all messages, that belong to the partition, that defines the shard this
29    * service represents.
30    * The loader signals, that its work is done, if the given end offset is reached.
31    * @param record A record, that represents a message from one of the {@link ChatRoom}s, that belong to the partition.
32    * @return {@code true}, if all messages are consumed.
33    */
34   boolean handleMessage(ConsumerRecord<UUID, MessageTo> record)
35   {
36     if (record.offset() >= offsetOfFirstNewMessage)
37     {
38       // All messages consumed: DONE!
39       log.debug("I");
40       return true;
41     }
42
43     Instant timestamp = Instant.ofEpochMilli(record.timestamp());
44     LocalDateTime time = LocalDateTime.ofInstant(timestamp, zoneId);
45
46     KafkaChatRoomService service = kafkaChatRoomServiceMap
47         .computeIfAbsent(record.key(), key ->
48         {
49         });
50
51     service.addMessage(new Message(
52         Message.MessageKey.of(
53             record.value().getUser(),
54             record.value().getId()),
55         record.offset(),
56         time,
57         record.value().getText()
58         ));
59
60     return false;
61   }
62 }