1 package de.juplo.kafka.chat.backend.persistence.kafka;
3 import de.juplo.kafka.chat.backend.domain.ChatRoom;
4 import de.juplo.kafka.chat.backend.domain.Message;
5 import lombok.RequiredArgsConstructor;
6 import lombok.extern.slf4j.Slf4j;
7 import org.apache.kafka.clients.consumer.ConsumerRecord;
9 import java.time.Instant;
10 import java.time.LocalDateTime;
11 import java.time.ZoneId;
12 import java.util.HashMap;
14 import java.util.UUID;
17 @RequiredArgsConstructor
21 private final long offsetOfFirstUnseenMessage;
22 private final ZoneId zoneId;
23 private final Map<UUID, KafkaChatRoomService> kafkaChatRoomServiceMap = new HashMap<>();
27 * Rebuilds the state of the {@link KafkaChatHomeService} by consuming
28 * all messages, that belong to the partition, that defines the shard this
30 * The loader signals, that its work is done, if the given end offset is reached.
31 * @param record A record, that represents a message from one of the {@link ChatRoom}s, that belong to the partition.
32 * @return {@code true}, if all messages are consumed.
34 boolean handleMessage(ConsumerRecord<UUID, MessageTo> record)
36 Message.MessageKey messageKey = Message.MessageKey.of(
37 record.value().getUser(),
38 record.value().getId());
40 if (record.offset() >= offsetOfFirstUnseenMessage)
42 // All messages consumed: DONE!
44 "Ignoring unseen message {}: topic={}, partition={}, offset={}",
52 Instant timestamp = Instant.ofEpochMilli(record.timestamp());
53 LocalDateTime time = LocalDateTime.ofInstant(timestamp, zoneId);
55 KafkaChatRoomService service = kafkaChatRoomServiceMap
56 .computeIfAbsent(record.key(), key ->
60 service.addMessage(new Message(
64 record.value().getText()