7149603a76f1c856c06ee76d798cb83e912765e1
[demos/kafka/chat] /
1 package de.juplo.kafka.chat.backend.persistence.kafka;
2
3 import de.juplo.kafka.chat.backend.domain.Message;
4 import org.apache.kafka.clients.consumer.Consumer;
5 import org.apache.kafka.common.TopicPartition;
6 import reactor.core.publisher.Mono;
7
8 import java.time.LocalDateTime;
9
10
11 class ChatRoomLoadingMessageHandlingStrategy implements MessageHandlingStrategy
12 {
13   private final Consumer consumer;
14   private final TopicPartition tp;
15   private final long currentOffset;
16   private final long unseenOffset;
17
18   /**
19    * Wird nicht benötigt!!!
20    * Derzeit?
21    * @param consumer
22    * @param tp
23    * @param currentOffset
24    * @param unseenOffset
25    */
26   ChatRoomLoadingMessageHandlingStrategy(
27       Consumer consumer,
28       TopicPartition tp,
29       long currentOffset,
30       long unseenOffset)
31   {
32     this.consumer = consumer;
33     this.tp = tp;
34     this.currentOffset = currentOffset;
35     this.unseenOffset = unseenOffset;
36
37     consumer.seek(tp, unseenOffset);
38   }
39
40   @Override
41   public Mono<Message> handleMessage(Message.MessageKey key, LocalDateTime timestamp, String text)
42   {
43     // TODO
44     return null;
45   }
46 }