WIP
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / ChatHomeLoader.java
1 package de.juplo.kafka.chat.backend.persistence.kafka;
2
3 import de.juplo.kafka.chat.backend.domain.ChatRoom;
4 import de.juplo.kafka.chat.backend.domain.Message;
5 import lombok.Getter;
6 import lombok.RequiredArgsConstructor;
7 import lombok.extern.slf4j.Slf4j;
8 import org.apache.kafka.clients.consumer.ConsumerRecord;
9 import org.apache.kafka.clients.producer.Producer;
10 import org.apache.kafka.common.TopicPartition;
11
12 import java.time.Instant;
13 import java.time.LocalDateTime;
14 import java.time.ZoneId;
15 import java.util.HashMap;
16 import java.util.Map;
17 import java.util.UUID;
18
19
20 @RequiredArgsConstructor
21 @Slf4j
22 class ChatHomeLoader
23 {
24   private final Producer<String, MessageTo> producer;
25   private final long offsetOfFirstUnseenMessage;
26   private final ZoneId zoneId;
27   @Getter
28   private final Map<UUID, KafkaChatRoomService> kafkaChatRoomServiceMap = new HashMap<>();
29
30
31   /**
32    * Rebuilds the state of the {@link KafkaChatHomeService} by consuming
33    * all messages, that belong to the partition, that defines the shard this
34    * service represents.
35    * The loader signals, that its work is done, if the given end offset is reached.
36    * @param record A record, that represents a message from one of the {@link ChatRoom}s, that belong to the partition.
37    * @return {@code true}, if all messages are consumed.
38    */
39   boolean handleMessage(ConsumerRecord<UUID, MessageTo> record)
40   {
41     TopicPartition topicPartition =
42         new TopicPartition(record.topic(), record.partition());
43     Message.MessageKey messageKey = Message.MessageKey.of(
44         record.value().getUser(),
45         record.value().getId());
46
47     if (record.offset() >= offsetOfFirstUnseenMessage)
48     {
49       // All messages consumed: DONE!
50       log.trace(
51           "Ignoring unseen message {} on {}, offset={}",
52           messageKey,
53           topicPartition,
54           record.offset());
55       return true;
56     }
57
58     Instant timestamp = Instant.ofEpochMilli(record.timestamp());
59     LocalDateTime time = LocalDateTime.ofInstant(timestamp, zoneId);
60
61     KafkaChatRoomService service = kafkaChatRoomServiceMap
62         .computeIfAbsent(record.key(), key ->
63             new KafkaChatRoomService(producer, topicPartition));
64
65     service.addMessage(new Message(
66         messageKey,
67         record.offset(),
68         time,
69         record.value().getText()));
70
71     return false;
72   }
73 }