WIP
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / KafkaChatHomeService.java
1 package de.juplo.kafka.chat.backend.persistence.kafka;
2
3 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
4 import de.juplo.kafka.chat.backend.domain.ChatRoom;
5 import de.juplo.kafka.chat.backend.domain.Message;
6 import lombok.RequiredArgsConstructor;
7 import lombok.extern.slf4j.Slf4j;
8 import org.apache.kafka.clients.consumer.Consumer;
9 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
10 import org.apache.kafka.common.TopicPartition;
11 import reactor.core.publisher.Flux;
12 import reactor.core.publisher.Mono;
13
14 import java.util.*;
15
16
17 @Slf4j
18 public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceListener
19 {
20   private final Consumer<String, MessageTo> consumer;
21   private final String topic;
22   private final long[] offsets;
23   private final MessageHandler[] handlers;
24   private final Map<UUID, ChatRoom>[] chatrooms;
25
26
27   public KafkaChatHomeService(
28     Consumer<String, MessageTo> consumer,
29     String topic,
30     int numShards)
31   {
32     log.debug("Creating KafkaChatHomeService");
33     this.consumer = consumer;
34     this.topic = topic;
35     this.offsets = new long[numShards];
36     this.handlers = new MessageHandler[numShards];
37     for (int i=0; i< numShards; i++)
38     {
39       this.offsets[i] = 0l;
40       this.handlers[i] = new MessageHandler(new TopicPartition(topic, i));
41     }
42     this.chatrooms = new Map[numShards];
43   }
44
45
46   @Override
47   public void onPartitionsAssigned(Collection<TopicPartition> partitions)
48   {
49     consumer.endOffsets(partitions).forEach((tp, currentOffset) ->
50     {
51       if (!tp.topic().equals(topic))
52       {
53         log.warn("Ignoring partition from unwanted topic: {}", tp);
54         return;
55       }
56
57       int partition = tp.partition();
58       long unseenOffset = offsets[partition];
59
60       log.info("Reading partition {} from {} -> {}", partition, unseenOffset, currentOffset);
61       handlers[partition] = new ChatRoomLoadingMessageHandlingStrategy(tp, currentOffset, unseenOffset);
62     });
63   }
64
65   @Override
66   public void onPartitionsRevoked(Collection<TopicPartition> partitions)
67   {
68     partitions.forEach(tp ->
69     {
70       if (!tp.topic().equals(topic))
71       {
72         log.warn("Ignoring partition from unwanted topic: {}", tp);
73         return;
74       }
75
76       int partition = tp.partition();
77       long unseenOffset = offsets[partition];
78
79       log.info("Reading partition {} from {} -> {}", partition, unseenOffset, currentOffset);
80     });
81     log.info("Revoked partitions: {}", partitions);
82   }
83
84   @Override
85   public void onPartitionsLost(Collection<TopicPartition> partitions)
86   {
87     log.info("Revoked partitions: {}", partitions);
88   }
89
90   private void foo()
91   {
92     Set<Integer> owned = Arrays
93       .stream(ownedShards)
94       .collect(
95         () -> new HashSet<>(),
96         (set, i) -> set.add(i),
97         (a, b) -> a.addAll(b));
98     for (int shard = 0; shard < numShards; shard++)
99     {
100       chatrooms[shard] = owned.contains(shard)
101         ? new HashMap<>()
102         : null;
103     }
104     chatroomFlux
105       .filter(chatRoom ->
106       {
107         if (owned.contains(chatRoom.getShard()))
108         {
109           return true;
110         }
111         else
112         {
113           log.info("Ignoring not owned chat-room {}", chatRoom);
114           return false;
115         }
116       })
117       .toStream()
118       .forEach(chatroom -> chatrooms[chatroom.getShard()].put(chatroom.getId(), chatroom));
119   }
120
121   @Override
122   public Mono<ChatRoom> putChatRoom(ChatRoom chatRoom)
123   {
124     chatrooms[chatRoom.getShard()].put(chatRoom.getId(), chatRoom);
125     return Mono.just(chatRoom);
126   }
127
128   @Override
129   public Mono<ChatRoom> getChatRoom(int shard, UUID id)
130   {
131     return Mono.justOrEmpty(chatrooms[shard].get(id));
132   }
133
134   @Override
135   public Flux<ChatRoom> getChatRooms(int shard)
136   {
137     return Flux.fromStream(chatrooms[shard].values().stream());
138   }
139
140
141   class MessageHandler
142   {
143
144   }
145
146   interface MessageHandlingStrategy
147   {
148     MessageHandlingStrategy handleMessage(Message message);
149   }
150
151
152   @RequiredArgsConstructor
153   class NoOpMessageHandlingStrategy implements MessageHandlingStrategy
154   {
155     private final TopicPartition tp;
156
157     @Override
158     public MessageHandlingStrategy handleMessage(Message message)
159     {
160       log.warn("Not handling message {} for partition {}", message, tp);
161       return this;
162     }
163   }
164
165   class ChatRoomLoadingMessageHandlingStrategy implements MessageHandlingStrategy
166   {
167     private final TopicPartition tp;
168     private final long currentOffset;
169     private final long unseenOffset;
170
171     ChatRoomLoadingMessageHandlingStrategy(TopicPartition tp, long currentOffset, long unseenOffset)
172     {
173       this.tp = tp;
174       this.currentOffset = currentOffset;
175       this.unseenOffset = unseenOffset;
176
177       consumer.seek(tp, unseenOffset);
178     }
179
180     @Override
181     public MessageHandlingStrategy handleMessage(Message message)
182     {
183       // todo
184       return this;
185     }
186   }
187
188   @RequiredArgsConstructor
189   class DefaultMessageHandlingStrategy implements MessageHandlingStrategy
190   {
191     private final TopicPartition tp;
192
193     @Override
194     public MessageHandlingStrategy handleMessage(Message message)
195     {
196       chatrooms[tp.partition()].put()
197       return this;
198     }
199   }
200 }