1 package de.juplo.kafka.chat.backend.persistence.kafka;
3 import de.juplo.kafka.chat.backend.domain.Message;
4 import org.apache.kafka.clients.consumer.Consumer;
5 import org.apache.kafka.common.TopicPartition;
6 import reactor.core.publisher.Mono;
8 import java.time.LocalDateTime;
14 * {@link ChatRoomActiveMessageHandlingStrategy} is the only active strategy at the moment.
17 class ChatRoomLoadingMessageHandlingStrategy implements MessageHandlingStrategy
19 private final Consumer consumer;
20 private final TopicPartition tp;
21 private final long currentOffset;
22 private final long unseenOffset;
24 ChatRoomLoadingMessageHandlingStrategy(
30 this.consumer = consumer;
32 this.currentOffset = currentOffset;
33 this.unseenOffset = unseenOffset;
35 consumer.seek(tp, unseenOffset);
39 public Mono<Message> handleMessage(Message.MessageKey key, LocalDateTime timestamp, String text)