--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import de.juplo.kafka.chat.backend.domain.Message;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+
+@RequiredArgsConstructor
+@Slf4j
+class ChatHomeLoader
+{
+ private final long offsetOfFirstNewMessage;
+ private final ZoneId zoneId;
+ private final Map<UUID, KafkaChatRoomService> kafkaChatRoomServiceMap = new HashMap<>();
+
+
+ /**
+ * Rebuilds the state of the {@link KafkaChatHomeService} by consuming
+ * all messages, that belong to the partition, that defines the shard this
+ * service represents.
+ * The loader signals, that its work is done, if the given end offset is reached.
+ * @param record A record, that represents a message from one of the {@link ChatRoom}s, that belong to the partition.
+ * @return {@code true}, if all messages are consumed.
+ */
+ boolean handleMessage(ConsumerRecord<UUID, MessageTo> record)
+ {
+ if (record.offset() >= offsetOfFirstNewMessage)
+ {
+ // All messages consumed: DONE!
+ log.debug("I");
+ return true;
+ }
+
+ Instant timestamp = Instant.ofEpochMilli(record.timestamp());
+ LocalDateTime time = LocalDateTime.ofInstant(timestamp, zoneId);
+
+ KafkaChatRoomService service = kafkaChatRoomServiceMap
+ .computeIfAbsent(record.key(), key ->
+ {
+ });
+
+ service.addMessage(new Message(
+ Message.MessageKey.of(
+ record.value().getUser(),
+ record.value().getId()),
+ record.offset(),
+ time,
+ record.value().getText()
+ ));
+
+ return false;
+ }
+}
{
private final Consumer<String, MessageTo> consumer;
private final String topic;
- private final long[] offsets;
+ // private final long[] offsets; Erst mal immer alles neu einlesen
private final Map<UUID, ChatRoom>[] chatrooms;
log.debug("Creating KafkaChatHomeService");
this.consumer = consumer;
this.topic = topic;
- this.offsets = new long[numShards];
- this.chatrooms = new Map<>[numShards];
- for (int i=0; i< numShards; i++)
- {
- this.offsets[i] = 0l;
- this.chatrooms[i] = chat
- }
- this.chatrooms = new Map[numShards];
+ // this.offsets = new long[numShards];
+ // for (int i=0; i< numShards; i++)
+ // {
+ // this.offsets[i] = 0l;
+ // }
this.chatrooms = new Map[numShards];
}
}
int partition = tp.partition();
- long unseenOffset = offsets[partition];
+ long unseenOffset = 0; // offsets[partition];
log.info(
"Loading messages from partition {}: start-offset={} -> current-offset={}",
partition,
unseenOffset,
currentOffset);
+
+ consumer.seek(tp, unseenOffset);
+ chatrooms[partition]
+ .values()
+ .stream()
handlers[partition] = new ChatRoomLoadingMessageHandlingStrategy(tp, currentOffset, unseenOffset);
});
}