af99b8092956c9d2d95e71809210ad48e42023c5
[demos/kafka/chat] /
1 package de.juplo.kafka.chat.backend.persistence.kafka;
2
3 import de.juplo.kafka.chat.backend.domain.Message;
4 import org.apache.kafka.clients.consumer.Consumer;
5 import org.apache.kafka.common.TopicPartition;
6
7
8 class ChatRoomLoadingMessageHandlingStrategy implements MessageHandlingStrategy
9 {
10   private final Consumer consumer;
11   private final TopicPartition tp;
12   private final long currentOffset;
13   private final long unseenOffset;
14
15   ChatRoomLoadingMessageHandlingStrategy(
16       Consumer consumer,
17       TopicPartition tp,
18       long currentOffset,
19       long unseenOffset)
20   {
21     this.consumer = consumer;
22     this.tp = tp;
23     this.currentOffset = currentOffset;
24     this.unseenOffset = unseenOffset;
25
26     consumer.seek(tp, unseenOffset);
27   }
28
29   @Override
30   public MessageHandlingStrategy handleMessage(Message message)
31   {
32     // todo
33     return this;
34   }
35 }