TMP:test -- FIX: `ChatRoomData` active/inactive
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / kafka / InfoChannel.java
1 package de.juplo.kafka.chat.backend.implementation.kafka;
2
3 import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
4 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
5 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
6 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardAssigned;
7 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardRevoked;
8 import lombok.Getter;
9 import lombok.ToString;
10 import lombok.extern.slf4j.Slf4j;
11 import org.apache.kafka.clients.consumer.Consumer;
12 import org.apache.kafka.clients.consumer.ConsumerRecord;
13 import org.apache.kafka.clients.consumer.ConsumerRecords;
14 import org.apache.kafka.clients.producer.Producer;
15 import org.apache.kafka.clients.producer.ProducerRecord;
16 import org.apache.kafka.common.errors.WakeupException;
17 import reactor.core.publisher.Flux;
18 import reactor.core.publisher.Mono;
19
20 import java.net.URI;
21 import java.time.Duration;
22 import java.util.HashMap;
23 import java.util.Map;
24 import java.util.UUID;
25 import java.util.stream.IntStream;
26
27
28 @ToString(of = { "topic", "instanceUri" })
29 @Slf4j
30 public class InfoChannel implements Channel
31 {
32   private final String topic;
33   private final Producer<String, AbstractMessageTo> producer;
34   private final Consumer<String, AbstractMessageTo> consumer;
35   private final Duration pollingInterval;
36   private final int numShards;
37   private final String[] shardOwners;
38   private final long[] currentOffset;
39   private final long[] nextOffset;
40   private final Map<UUID, ChatRoomInfo> chatRoomInfo;
41   private final String instanceUri;
42   private final ChannelMediator channelMediator;
43
44   private boolean running;
45   @Getter
46   private volatile ChannelState channelState = ChannelState.STARTING;
47
48
49   public InfoChannel(
50     String topic,
51     Producer<String, AbstractMessageTo> producer,
52     Consumer<String, AbstractMessageTo> infoChannelConsumer,
53     Duration pollingInterval,
54     int numShards,
55     URI instanceUri,
56     ChannelMediator channelMediator)
57   {
58     log.debug(
59         "Creating InfoChannel for topic {}",
60         topic);
61     this.topic = topic;
62     this.consumer = infoChannelConsumer;
63     this.producer = producer;
64     this.chatRoomInfo = new HashMap<>();
65
66     this.pollingInterval = pollingInterval;
67
68     this.numShards = numShards;
69     this.shardOwners = new String[numShards];
70     this.currentOffset = new long[numShards];
71     this.nextOffset = new long[numShards];
72     IntStream
73         .range(0, numShards)
74         .forEach(partition -> this.nextOffset[partition] = -1l);
75
76     this.instanceUri = instanceUri.toASCIIString();
77
78     this.channelMediator = channelMediator;
79   }
80
81
82   Mono<ChatRoomInfo> sendChatRoomCreatedEvent(
83       UUID chatRoomId,
84       String name,
85       int shard)
86   {
87     EventChatRoomCreated to = EventChatRoomCreated.of(chatRoomId, name, shard);
88     return Mono.create(sink ->
89     {
90       ProducerRecord<String, AbstractMessageTo> record =
91           new ProducerRecord<>(
92               topic,
93               Integer.toString(shard),
94               to);
95
96       producer.send(record, ((metadata, exception) ->
97       {
98         if (exception == null)
99         {
100           log.info("Successfully sent created event for chat chat-room: {}", to);
101           ChatRoomInfo chatRoomInfo = new ChatRoomInfo(chatRoomId, name, shard);
102           sink.success(chatRoomInfo);
103         }
104         else
105         {
106           // On send-failure
107           log.error(
108               "Could not send created event for chat-room (id={}, name={}): {}",
109               chatRoomId,
110               name,
111               exception);
112           sink.error(exception);
113         }
114       }));
115     });
116   }
117
118   void sendShardAssignedEvent(int shard)
119   {
120     EventShardAssigned to = EventShardAssigned.of(shard, instanceUri);
121
122     ProducerRecord<String, AbstractMessageTo> record =
123         new ProducerRecord<>(
124             topic,
125             Integer.toString(shard),
126             to);
127
128     producer.send(record, ((metadata, exception) ->
129     {
130       if (metadata != null)
131       {
132         log.info("Successfully sent shard assigned event for shard: {}", shard);
133       }
134       else
135       {
136         // On send-failure
137         log.error(
138             "Could not send shard assigned event for shard {}: {}",
139             shard,
140             exception);
141         // TODO:
142         // Verhalten im Fehlerfall durchdenken!
143         // Z.B.: unsubscribe() und darauf folgendes (re-)subscribe() des
144         // Consumers veranlassen, so dass die nicht öffentlich Bekannte
145         // Zuständigkeit abgegeben und neu zugeordnet wird?
146         // Falls der Weg gegangen wird: Achtung wegen Sticke Partitions!
147       }
148     }));
149   }
150
151   void sendShardRevokedEvent(int shard)
152   {
153     EventShardRevoked to = EventShardRevoked.of(shard, instanceUri);
154
155     ProducerRecord<String, AbstractMessageTo> record =
156         new ProducerRecord<>(
157             topic,
158             Integer.toString(shard),
159             to);
160
161     producer.send(record, ((metadata, exception) ->
162     {
163       if (metadata != null)
164       {
165         log.info("Successfully sent shard revoked event for shard: {}", shard);
166       }
167       else
168       {
169         // On send-failure
170         log.error(
171             "Could not send shard revoked event for shard {}: {}",
172             shard,
173             exception);
174         // TODO:
175         // Verhalten im Fehlerfall durchdenken!
176         // Ggf. einfach egal, da die neue zuständige Instanz den
177         // nicht gelöschten Eintrag eh überschreibt?
178       }
179     }));
180   }
181
182
183   @Override
184   public void run()
185   {
186     running = true;
187
188     consumer
189         .endOffsets(consumer.assignment())
190         .entrySet()
191         .stream()
192         .forEach(entry -> this.currentOffset[entry.getKey().partition()] = entry.getValue());
193     IntStream
194         .range(0, numShards)
195         .forEach(partition -> this.nextOffset[partition] = 0l);
196     channelState = ChannelState.LOAD_IN_PROGRESS;
197
198     while (running)
199     {
200       try
201       {
202         ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(pollingInterval);
203         log.debug("Fetched {} messages", records.count());
204         for (ConsumerRecord<String, AbstractMessageTo> record : records)
205         {
206           handleMessage(record);
207           updateNextOffset(record.partition(), record.offset() + 1);
208         }
209       }
210       catch (WakeupException e)
211       {
212         log.info("Received WakeupException, exiting!");
213         channelState = ChannelState.SHUTTING_DOWN;
214         running = false;
215       }
216     }
217
218     log.info("Exiting normally");
219   }
220
221   private void updateNextOffset(int partition, long nextOffset)
222   {
223     this.nextOffset[partition] = nextOffset;
224     if (channelState == ChannelState.LOAD_IN_PROGRESS)
225     {
226       boolean loadInProgress = IntStream
227           .range(0, numShards)
228           .anyMatch(shard -> this.nextOffset[shard] < currentOffset[shard]);
229       if (!loadInProgress)
230       {
231         log.info("Loading of info completed! Resuming normal operations...");
232         channelState = ChannelState.READY;
233       }
234     }
235   }
236
237   private void handleMessage(ConsumerRecord<String, AbstractMessageTo> record)
238   {
239     switch (record.value().getType())
240     {
241       case EVENT_CHATROOM_CREATED:
242         EventChatRoomCreated eventChatRoomCreated =
243             (EventChatRoomCreated) record.value();
244         createChatRoom(eventChatRoomCreated.toChatRoomInfo());
245         break;
246
247       case EVENT_SHARD_ASSIGNED:
248         EventShardAssigned eventShardAssigned =
249             (EventShardAssigned) record.value();
250         log.info(
251             "Shard {} was assigned to {}",
252             eventShardAssigned.getShard(),
253             eventShardAssigned.getUri());
254         shardOwners[eventShardAssigned.getShard()] = eventShardAssigned.getUri();
255         break;
256
257       case EVENT_SHARD_REVOKED:
258         EventShardRevoked eventShardRevoked =
259             (EventShardRevoked) record.value();
260         log.info(
261             "Shard {} was revoked from {}",
262             eventShardRevoked.getShard(),
263             eventShardRevoked.getUri());
264         shardOwners[eventShardRevoked.getShard()] = null;
265         break;
266
267       default:
268         log.debug(
269             "Ignoring message for key={} with offset={}: {}",
270             record.key(),
271             record.offset(),
272             record.value());
273     }
274   }
275
276   private void createChatRoom(ChatRoomInfo chatRoomInfo)
277   {
278     UUID chatRoomId = chatRoomInfo.getId();
279     Integer partition = chatRoomInfo.getShard();
280
281     if (this.chatRoomInfo.containsKey(chatRoomId))
282     {
283       log.warn(
284           "Ignoring existing chat-room for {}: {}",
285           partition,
286           chatRoomId);
287     }
288     else
289     {
290       log.info(
291           "Adding new chat-room for partition {}: {}",
292           partition,
293           chatRoomId);
294
295       this.chatRoomInfo.put(chatRoomId, chatRoomInfo);
296       this.channelMediator.chatRoomCreated(chatRoomInfo);
297     }
298   }
299
300   Flux<ChatRoomInfo> getChatRoomInfo()
301   {
302     return Flux.fromIterable(chatRoomInfo.values());
303   }
304
305   Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
306   {
307     ChannelState capturedState = channelState;
308     if (capturedState != ChannelState.READY)
309     {
310       return Mono.error(new ChannelNotReadyException(capturedState));
311     }
312
313     return Mono.fromSupplier(() -> chatRoomInfo.get(id));
314   }
315
316   Mono<String[]> getShardOwners()
317   {
318     return Mono.just(shardOwners);
319   }
320 }