WIP
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / ChatHomeLoader.java
1 package de.juplo.kafka.chat.backend.persistence.kafka;
2
3 import de.juplo.kafka.chat.backend.domain.ChatRoom;
4 import de.juplo.kafka.chat.backend.domain.Message;
5 import lombok.RequiredArgsConstructor;
6 import lombok.extern.slf4j.Slf4j;
7 import org.apache.kafka.clients.consumer.ConsumerRecord;
8
9 import java.time.Instant;
10 import java.time.LocalDateTime;
11 import java.time.ZoneId;
12 import java.util.HashMap;
13 import java.util.Map;
14 import java.util.UUID;
15
16
17 @RequiredArgsConstructor
18 @Slf4j
19 class ChatHomeLoader
20 {
21   private final long offsetOfFirstUnseenMessage;
22   private final ZoneId zoneId;
23   private final Map<UUID, KafkaChatRoomService> kafkaChatRoomServiceMap = new HashMap<>();
24
25
26   /**
27    * Rebuilds the state of the {@link KafkaChatHomeService} by consuming
28    * all messages, that belong to the partition, that defines the shard this
29    * service represents.
30    * The loader signals, that its work is done, if the given end offset is reached.
31    * @param record A record, that represents a message from one of the {@link ChatRoom}s, that belong to the partition.
32    * @return {@code true}, if all messages are consumed.
33    */
34   boolean handleMessage(ConsumerRecord<UUID, MessageTo> record)
35   {
36     Message.MessageKey messageKey = Message.MessageKey.of(
37         record.value().getUser(),
38         record.value().getId());
39
40     if (record.offset() >= offsetOfFirstUnseenMessage)
41     {
42       // All messages consumed: DONE!
43       log.trace(
44           "Ignoring unseen message {}: topic={}, partition={}, offset={}",
45           messageKey,
46           record.topic(),
47           record.partition(),
48           record.offset());
49       return true;
50     }
51
52     Instant timestamp = Instant.ofEpochMilli(record.timestamp());
53     LocalDateTime time = LocalDateTime.ofInstant(timestamp, zoneId);
54
55     KafkaChatRoomService service = kafkaChatRoomServiceMap
56         .computeIfAbsent(record.key(), key ->
57         {
58         });
59
60     service.addMessage(new Message(
61         messageKey,
62         record.offset(),
63         time,
64         record.value().getText()
65         ));
66
67     return false;
68   }
69 }