FIX
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / kafka / InfoChannel.java
1 package de.juplo.kafka.chat.backend.implementation.kafka;
2
3 import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
4 import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException;
5 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
6 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
7 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardAssigned;
8 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardRevoked;
9 import lombok.Getter;
10 import lombok.extern.slf4j.Slf4j;
11 import org.apache.kafka.clients.consumer.Consumer;
12 import org.apache.kafka.clients.consumer.ConsumerRecord;
13 import org.apache.kafka.clients.consumer.ConsumerRecords;
14 import org.apache.kafka.clients.producer.Producer;
15 import org.apache.kafka.clients.producer.ProducerRecord;
16 import org.apache.kafka.clients.producer.RecordMetadata;
17 import org.apache.kafka.common.errors.WakeupException;
18 import reactor.core.publisher.Flux;
19 import reactor.core.publisher.Mono;
20
21 import java.net.URI;
22 import java.time.*;
23 import java.util.HashMap;
24 import java.util.Map;
25 import java.util.UUID;
26 import java.util.stream.IntStream;
27
28
29 @Slf4j
30 public class InfoChannel implements Runnable
31 {
32   private final String topic;
33   private final Producer<String, AbstractMessageTo> producer;
34   private final Consumer<String, AbstractMessageTo> consumer;
35   private final int numShards;
36   private final String[] shardOwners;
37   private final long[] currentOffset;
38   private final long[] nextOffset;
39   private final Map<UUID, ChatRoomInfo> chatRoomInfo;
40   private final String instanceUri;
41
42   private boolean running;
43   @Getter
44   private volatile boolean loadInProgress = true;
45
46
47   public InfoChannel(
48     String topic,
49     Producer<String, AbstractMessageTo> producer,
50     Consumer<String, AbstractMessageTo> infoChannelConsumer,
51     URI instanceUri)
52   {
53     log.debug(
54         "Creating InfoChannel for topic {}",
55         topic);
56     this.topic = topic;
57     this.consumer = infoChannelConsumer;
58     this.producer = producer;
59     this.chatRoomInfo = new HashMap<>();
60
61     this.numShards = consumer
62         .partitionsFor(topic)
63         .size();
64     this.shardOwners = new String[numShards];
65     this.currentOffset = new long[numShards];
66     this.nextOffset = new long[numShards];
67     IntStream
68         .range(0, numShards)
69         .forEach(partition -> this.nextOffset[partition] = -1l);
70
71     this.instanceUri = instanceUri.toASCIIString();
72   }
73
74
75   Mono<ChatRoomInfo> sendChatRoomCreatedEvent(
76       UUID chatRoomId,
77       String name,
78       int shard)
79   {
80     EventChatRoomCreated to = EventChatRoomCreated.of(chatRoomId, name, shard);
81     return Mono.create(sink ->
82     {
83       ProducerRecord<String, AbstractMessageTo> record =
84           new ProducerRecord<>(
85               topic,
86               Integer.toString(shard),
87               to);
88
89       producer.send(record, ((metadata, exception) ->
90       {
91         if (exception == null)
92         {
93           log.info("Successfully sent created event for chat chat-room: {}", to);
94           ChatRoomInfo chatRoomInfo = new ChatRoomInfo(chatRoomId, name, shard);
95           sink.success(chatRoomInfo);
96         }
97         else
98         {
99           // On send-failure
100           log.error(
101               "Could not send created event for chat-room (id={}, name={}): {}",
102               chatRoomId,
103               name,
104               exception);
105           sink.error(exception);
106         }
107       }));
108     });
109   }
110
111   void sendShardAssignedEvent(int shard)
112   {
113     EventShardAssigned to = EventShardAssigned.of(shard, instanceUri);
114
115     ProducerRecord<String, AbstractMessageTo> record =
116         new ProducerRecord<>(
117             topic,
118             Integer.toString(shard),
119             to);
120
121     producer.send(record, ((metadata, exception) ->
122     {
123       if (metadata != null)
124       {
125         log.info("Successfully sent shard assigned event for shard: {}", shard);
126       }
127       else
128       {
129         // On send-failure
130         log.error(
131             "Could not send shard assigned event for shard {}: {}",
132             shard,
133             exception);
134         // TODO:
135         // Verhalten im Fehlerfall durchdenken!
136         // Z.B.: unsubscribe() und darauf folgendes (re-)subscribe() des
137         // Consumers veranlassen, so dass die nicht öffentlich Bekannte
138         // Zuständigkeit abgegeben und neu zugeordnet wird?
139         // Falls der Weg gegangen wird: Achtung wegen Sticke Partitions!
140       }
141     }));
142   }
143
144   void sendShardRevokedEvent(int shard)
145   {
146     EventShardRevoked to = EventShardRevoked.of(shard, instanceUri);
147
148     ProducerRecord<String, AbstractMessageTo> record =
149         new ProducerRecord<>(
150             topic,
151             Integer.toString(shard),
152             to);
153
154     producer.send(record, ((metadata, exception) ->
155     {
156       if (metadata != null)
157       {
158         log.info("Successfully sent shard revoked event for shard: {}", shard);
159       }
160       else
161       {
162         // On send-failure
163         log.error(
164             "Could not send shard revoked event for shard {}: {}",
165             shard,
166             exception);
167         // TODO:
168         // Verhalten im Fehlerfall durchdenken!
169         // Ggf. einfach egal, da die neue zuständige Instanz den
170         // nicht gelöschten Eintrag eh überschreibt?
171       }
172     }));
173   }
174
175
176   @Override
177   public void run()
178   {
179     running = true;
180
181     consumer
182         .endOffsets(consumer.assignment())
183         .entrySet()
184         .stream()
185         .forEach(entry -> this.currentOffset[entry.getKey().partition()] = entry.getValue());
186     IntStream
187         .range(0, numShards)
188         .forEach(partition -> this.nextOffset[partition] = 0l);
189     loadInProgress = true;
190
191     while (running)
192     {
193       try
194       {
195         ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(Duration.ofMinutes(1));
196         log.debug("Fetched {} messages", records.count());
197         for (ConsumerRecord<String, AbstractMessageTo> record : records)
198         {
199           handleMessage(record);
200           updateNextOffset(record.partition(), record.offset() + 1);
201         }
202       }
203       catch (WakeupException e)
204       {
205         log.info("Received WakeupException, exiting!");
206         running = false;
207       }
208     }
209
210     log.info("Exiting normally");
211   }
212
213   private void updateNextOffset(int partition, long nextOffset)
214   {
215     this.nextOffset[partition] = nextOffset;
216     if (loadInProgress) {
217       loadInProgress = IntStream
218           .range(0, numShards)
219           .anyMatch(shard -> this.nextOffset[shard] < currentOffset[partition]);
220     }
221   }
222
223   private void handleMessage(ConsumerRecord<String, AbstractMessageTo> record)
224   {
225     switch (record.value().getType())
226     {
227       case EVENT_CHATROOM_CREATED:
228         EventChatRoomCreated eventChatRoomCreated =
229             (EventChatRoomCreated) record.value();
230         createChatRoom(eventChatRoomCreated.toChatRoomInfo());
231         break;
232
233       case EVENT_SHARD_ASSIGNED:
234         EventShardAssigned eventShardAssigned =
235             (EventShardAssigned) record.value();
236         log.info(
237             "Shard {} was assigned to {}",
238             eventShardAssigned.getShard(),
239             eventShardAssigned.getUri());
240         shardOwners[eventShardAssigned.getShard()] = eventShardAssigned.getUri();
241         break;
242
243       case EVENT_SHARD_REVOKED:
244         EventShardRevoked eventShardRevoked =
245             (EventShardRevoked) record.value();
246         log.info(
247             "Shard {} was revoked from {}",
248             eventShardRevoked.getShard(),
249             eventShardRevoked.getUri());
250         shardOwners[eventShardRevoked.getShard()] = null;
251         break;
252
253       default:
254         log.debug(
255             "Ignoring message for key={} with offset={}: {}",
256             record.key(),
257             record.offset(),
258             record.value());
259     }
260   }
261
262   private void createChatRoom(ChatRoomInfo chatRoomInfo)
263   {
264     UUID chatRoomId = chatRoomInfo.getId();
265     Integer partition = chatRoomInfo.getShard();
266
267     if (this.chatRoomInfo.containsKey(chatRoomId))
268     {
269       log.warn(
270           "Ignoring existing chat-room for {}: {}",
271           partition,
272           chatRoomId);
273     }
274     else
275     {
276       log.info(
277           "Adding new chat-room for partition {}: {}",
278           partition,
279           chatRoomId);
280
281       this.chatRoomInfo.put(chatRoomId, chatRoomInfo);
282     }
283   }
284
285   Flux<ChatRoomInfo> getChatRoomInfo()
286   {
287     return Flux.fromIterable(chatRoomInfo.values());
288   }
289
290   Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
291   {
292     if (loadInProgress)
293     {
294       return Mono.error(new LoadInProgressException());
295     }
296
297     return Mono.fromSupplier(() -> chatRoomInfo.get(id));
298   }
299
300   Mono<String[]> getShardOwners()
301   {
302     return Mono.just(shardOwners);
303   }
304 }