f3150ce253985072dcbecd398c35053d36a04bf8
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / kafka / InfoChannel.java
1 package de.juplo.kafka.chat.backend.implementation.kafka;
2
3 import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
4 import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException;
5 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
6 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
7 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardAssigned;
8 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardRevoked;
9 import lombok.Getter;
10 import lombok.extern.slf4j.Slf4j;
11 import org.apache.kafka.clients.consumer.Consumer;
12 import org.apache.kafka.clients.consumer.ConsumerRecord;
13 import org.apache.kafka.clients.consumer.ConsumerRecords;
14 import org.apache.kafka.clients.producer.Producer;
15 import org.apache.kafka.clients.producer.ProducerRecord;
16 import org.apache.kafka.clients.producer.RecordMetadata;
17 import org.apache.kafka.common.errors.WakeupException;
18 import reactor.core.publisher.Flux;
19 import reactor.core.publisher.Mono;
20
21 import java.net.URI;
22 import java.time.*;
23 import java.util.HashMap;
24 import java.util.Map;
25 import java.util.UUID;
26 import java.util.stream.IntStream;
27
28
29 @Slf4j
30 public class InfoChannel implements Runnable
31 {
32   private final String topic;
33   private final Producer<String, AbstractMessageTo> producer;
34   private final Consumer<String, AbstractMessageTo> consumer;
35   private final Duration pollingInterval;
36   private final int numShards;
37   private final String[] shardOwners;
38   private final long[] currentOffset;
39   private final long[] nextOffset;
40   private final Map<UUID, ChatRoomInfo> chatRoomInfo;
41   private final String instanceUri;
42   private final ChannelMediator channelMediator;
43
44   private boolean running;
45   @Getter
46   private volatile boolean loadInProgress = true;
47
48
49   public InfoChannel(
50     String topic,
51     Producer<String, AbstractMessageTo> producer,
52     Consumer<String, AbstractMessageTo> infoChannelConsumer,
53     Duration pollingInterval,
54     int numShards,
55     URI instanceUri,
56     ChannelMediator channelMediator)
57   {
58     log.debug(
59         "Creating InfoChannel for topic {}",
60         topic);
61     this.topic = topic;
62     this.consumer = infoChannelConsumer;
63     this.producer = producer;
64     this.chatRoomInfo = new HashMap<>();
65
66     this.pollingInterval = pollingInterval;
67
68     this.numShards = numShards;
69     this.shardOwners = new String[numShards];
70     this.currentOffset = new long[numShards];
71     this.nextOffset = new long[numShards];
72     IntStream
73         .range(0, numShards)
74         .forEach(partition -> this.nextOffset[partition] = -1l);
75
76     this.instanceUri = instanceUri.toASCIIString();
77
78     this.channelMediator = channelMediator;
79   }
80
81
82   Mono<ChatRoomInfo> sendChatRoomCreatedEvent(
83       UUID chatRoomId,
84       String name,
85       int shard)
86   {
87     EventChatRoomCreated to = EventChatRoomCreated.of(chatRoomId, name, shard);
88     return Mono.create(sink ->
89     {
90       ProducerRecord<String, AbstractMessageTo> record =
91           new ProducerRecord<>(
92               topic,
93               Integer.toString(shard),
94               to);
95
96       producer.send(record, ((metadata, exception) ->
97       {
98         if (exception == null)
99         {
100           log.info("Successfully sent created event for chat chat-room: {}", to);
101           ChatRoomInfo chatRoomInfo = new ChatRoomInfo(chatRoomId, name, shard);
102           sink.success(chatRoomInfo);
103         }
104         else
105         {
106           // On send-failure
107           log.error(
108               "Could not send created event for chat-room (id={}, name={}): {}",
109               chatRoomId,
110               name,
111               exception);
112           sink.error(exception);
113         }
114       }));
115     });
116   }
117
118   void sendShardAssignedEvent(int shard)
119   {
120     EventShardAssigned to = EventShardAssigned.of(shard, instanceUri);
121
122     ProducerRecord<String, AbstractMessageTo> record =
123         new ProducerRecord<>(
124             topic,
125             Integer.toString(shard),
126             to);
127
128     producer.send(record, ((metadata, exception) ->
129     {
130       if (metadata != null)
131       {
132         log.info("Successfully sent shard assigned event for shard: {}", shard);
133       }
134       else
135       {
136         // On send-failure
137         log.error(
138             "Could not send shard assigned event for shard {}: {}",
139             shard,
140             exception);
141         // TODO:
142         // Verhalten im Fehlerfall durchdenken!
143         // Z.B.: unsubscribe() und darauf folgendes (re-)subscribe() des
144         // Consumers veranlassen, so dass die nicht öffentlich Bekannte
145         // Zuständigkeit abgegeben und neu zugeordnet wird?
146         // Falls der Weg gegangen wird: Achtung wegen Sticke Partitions!
147       }
148     }));
149   }
150
151   void sendShardRevokedEvent(int shard)
152   {
153     EventShardRevoked to = EventShardRevoked.of(shard, instanceUri);
154
155     ProducerRecord<String, AbstractMessageTo> record =
156         new ProducerRecord<>(
157             topic,
158             Integer.toString(shard),
159             to);
160
161     producer.send(record, ((metadata, exception) ->
162     {
163       if (metadata != null)
164       {
165         log.info("Successfully sent shard revoked event for shard: {}", shard);
166       }
167       else
168       {
169         // On send-failure
170         log.error(
171             "Could not send shard revoked event for shard {}: {}",
172             shard,
173             exception);
174         // TODO:
175         // Verhalten im Fehlerfall durchdenken!
176         // Ggf. einfach egal, da die neue zuständige Instanz den
177         // nicht gelöschten Eintrag eh überschreibt?
178       }
179     }));
180   }
181
182
183   @Override
184   public void run()
185   {
186     running = true;
187
188     consumer
189         .endOffsets(consumer.assignment())
190         .entrySet()
191         .stream()
192         .forEach(entry -> this.currentOffset[entry.getKey().partition()] = entry.getValue());
193     IntStream
194         .range(0, numShards)
195         .forEach(partition -> this.nextOffset[partition] = 0l);
196     loadInProgress = true;
197
198     while (running)
199     {
200       try
201       {
202         ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(pollingInterval);
203         log.debug("Fetched {} messages", records.count());
204         for (ConsumerRecord<String, AbstractMessageTo> record : records)
205         {
206           handleMessage(record);
207           updateNextOffset(record.partition(), record.offset() + 1);
208         }
209       }
210       catch (WakeupException e)
211       {
212         log.info("Received WakeupException, exiting!");
213         running = false;
214       }
215     }
216
217     log.info("Exiting normally");
218   }
219
220   private void updateNextOffset(int partition, long nextOffset)
221   {
222     this.nextOffset[partition] = nextOffset;
223     if (loadInProgress) {
224       loadInProgress = IntStream
225           .range(0, numShards)
226           .anyMatch(shard -> this.nextOffset[shard] < currentOffset[partition]);
227     }
228   }
229
230   private void handleMessage(ConsumerRecord<String, AbstractMessageTo> record)
231   {
232     switch (record.value().getType())
233     {
234       case EVENT_CHATROOM_CREATED:
235         EventChatRoomCreated eventChatRoomCreated =
236             (EventChatRoomCreated) record.value();
237         createChatRoom(eventChatRoomCreated.toChatRoomInfo());
238         break;
239
240       case EVENT_SHARD_ASSIGNED:
241         EventShardAssigned eventShardAssigned =
242             (EventShardAssigned) record.value();
243         log.info(
244             "Shard {} was assigned to {}",
245             eventShardAssigned.getShard(),
246             eventShardAssigned.getUri());
247         shardOwners[eventShardAssigned.getShard()] = eventShardAssigned.getUri();
248         break;
249
250       case EVENT_SHARD_REVOKED:
251         EventShardRevoked eventShardRevoked =
252             (EventShardRevoked) record.value();
253         log.info(
254             "Shard {} was revoked from {}",
255             eventShardRevoked.getShard(),
256             eventShardRevoked.getUri());
257         shardOwners[eventShardRevoked.getShard()] = null;
258         break;
259
260       default:
261         log.debug(
262             "Ignoring message for key={} with offset={}: {}",
263             record.key(),
264             record.offset(),
265             record.value());
266     }
267   }
268
269   private void createChatRoom(ChatRoomInfo chatRoomInfo)
270   {
271     UUID chatRoomId = chatRoomInfo.getId();
272     Integer partition = chatRoomInfo.getShard();
273
274     if (this.chatRoomInfo.containsKey(chatRoomId))
275     {
276       log.warn(
277           "Ignoring existing chat-room for {}: {}",
278           partition,
279           chatRoomId);
280     }
281     else
282     {
283       log.info(
284           "Adding new chat-room for partition {}: {}",
285           partition,
286           chatRoomId);
287
288       this.chatRoomInfo.put(chatRoomId, chatRoomInfo);
289       this.channelMediator.chatRoomCreated(chatRoomInfo);
290     }
291   }
292
293   Flux<ChatRoomInfo> getChatRoomInfo()
294   {
295     return Flux.fromIterable(chatRoomInfo.values());
296   }
297
298   Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
299   {
300     if (loadInProgress)
301     {
302       return Mono.error(new LoadInProgressException());
303     }
304
305     return Mono.fromSupplier(() -> chatRoomInfo.get(id));
306   }
307
308   Mono<String[]> getShardOwners()
309   {
310     return Mono.just(shardOwners);
311   }
312 }