+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import de.juplo.kafka.chat.backend.domain.Message;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+
+@RequiredArgsConstructor
+@Slf4j
+class ChatHomeLoader
+{
+ private final long offsetOfFirstNewMessage;
+ private final ZoneId zoneId;
+ private final Map<UUID, KafkaChatRoomService> kafkaChatRoomServiceMap = new HashMap<>();
+
+
+ /**
+ * Rebuilds the state of the {@link KafkaChatHomeService} by consuming
+ * all messages, that belong to the partition, that defines the shard this
+ * service represents.
+ * The loader signals, that its work is done, if the given end offset is reached.
+ * @param record A record, that represents a message from one of the {@link ChatRoom}s, that belong to the partition.
+ * @return {@code true}, if all messages are consumed.
+ */
+ boolean handleMessage(ConsumerRecord<UUID, MessageTo> record)
+ {
+ if (record.offset() >= offsetOfFirstNewMessage)
+ {
+ // All messages consumed: DONE!
+ log.debug("I");
+ return true;
+ }
+
+ Instant timestamp = Instant.ofEpochMilli(record.timestamp());
+ LocalDateTime time = LocalDateTime.ofInstant(timestamp, zoneId);
+
+ KafkaChatRoomService service = kafkaChatRoomServiceMap
+ .computeIfAbsent(record.key(), key ->
+ {
+ });
+
+ service.addMessage(new Message(
+ Message.MessageKey.of(
+ record.value().getUser(),
+ record.value().getId()),
+ record.offset(),
+ time,
+ record.value().getText()
+ ));
+
+ return false;
+ }
+}