1 package de.juplo.kafka.chat.backend.persistence.kafka;
3 import de.juplo.kafka.chat.backend.domain.Message;
4 import org.apache.kafka.clients.consumer.Consumer;
5 import org.apache.kafka.common.TopicPartition;
6 import reactor.core.publisher.Mono;
8 import java.time.LocalDateTime;
11 class ChatRoomLoadingMessageHandlingStrategy implements MessageHandlingStrategy
13 private final Consumer consumer;
14 private final TopicPartition tp;
15 private final long currentOffset;
16 private final long unseenOffset;
18 ChatRoomLoadingMessageHandlingStrategy(
24 this.consumer = consumer;
26 this.currentOffset = currentOffset;
27 this.unseenOffset = unseenOffset;
29 consumer.seek(tp, unseenOffset);
33 public Mono<Message> handleMessage(Message.MessageKey key, LocalDateTime timestamp, String text)