1 package de.juplo.kafka.chat.backend.persistence.kafka;
3 import de.juplo.kafka.chat.backend.domain.Message;
4 import org.apache.kafka.clients.consumer.Consumer;
5 import org.apache.kafka.common.TopicPartition;
6 import reactor.core.publisher.Mono;
8 import java.time.LocalDateTime;
11 class ChatRoomLoadingMessageHandlingStrategy implements MessageHandlingStrategy
13 private final Consumer consumer;
14 private final TopicPartition tp;
15 private final long currentOffset;
16 private final long unseenOffset;
19 * Wird nicht benötigt!!!
23 * @param currentOffset
26 ChatRoomLoadingMessageHandlingStrategy(
32 this.consumer = consumer;
34 this.currentOffset = currentOffset;
35 this.unseenOffset = unseenOffset;
37 consumer.seek(tp, unseenOffset);
41 public Mono<Message> handleMessage(Message.MessageKey key, LocalDateTime timestamp, String text)