77fe6425be6eea448629a9096bafcf3a3fc39619
[demos/kafka/chat] /
1 package de.juplo.kafka.chat.backend.persistence.kafka;
2
3 import de.juplo.kafka.chat.backend.domain.Message;
4 import org.apache.kafka.clients.consumer.Consumer;
5 import org.apache.kafka.common.TopicPartition;
6 import reactor.core.publisher.Mono;
7
8 import java.time.LocalDateTime;
9
10
11 class ChatRoomLoadingMessageHandlingStrategy implements MessageHandlingStrategy
12 {
13   private final Consumer consumer;
14   private final TopicPartition tp;
15   private final long currentOffset;
16   private final long unseenOffset;
17
18   ChatRoomLoadingMessageHandlingStrategy(
19       Consumer consumer,
20       TopicPartition tp,
21       long currentOffset,
22       long unseenOffset)
23   {
24     this.consumer = consumer;
25     this.tp = tp;
26     this.currentOffset = currentOffset;
27     this.unseenOffset = unseenOffset;
28
29     consumer.seek(tp, unseenOffset);
30   }
31
32   @Override
33   public Mono<Message> handleMessage(Message.MessageKey key, LocalDateTime timestamp, String text)
34   {
35     // TODO
36     return null;
37   }
38 }