c7a3c8b32051e898fd2d2806af8ca249d42c9906
[demos/kafka/chat] /
1 package de.juplo.kafka.chat.backend.persistence.kafka;
2
3 import de.juplo.kafka.chat.backend.domain.Message;
4 import org.apache.kafka.clients.consumer.Consumer;
5 import org.apache.kafka.common.TopicPartition;
6 import reactor.core.publisher.Mono;
7
8 import java.time.LocalDateTime;
9
10
11 /**
12  * TODO:
13  * Not used anywhere
14  * {@link ChatRoomActiveMessageHandlingStrategy} is the only active strategy at the moment.
15  * Refactor?
16  */
17 class ChatRoomLoadingMessageHandlingStrategy implements MessageHandlingStrategy
18 {
19   private final Consumer consumer;
20   private final TopicPartition tp;
21   private final long currentOffset;
22   private final long unseenOffset;
23
24   ChatRoomLoadingMessageHandlingStrategy(
25       Consumer consumer,
26       TopicPartition tp,
27       long currentOffset,
28       long unseenOffset)
29   {
30     this.consumer = consumer;
31     this.tp = tp;
32     this.currentOffset = currentOffset;
33     this.unseenOffset = unseenOffset;
34
35     consumer.seek(tp, unseenOffset);
36   }
37
38   @Override
39   public Mono<Message> handleMessage(Message.MessageKey key, LocalDateTime timestamp, String text)
40   {
41     // TODO
42     return null;
43   }
44 }