fix: Errors during shard-publishing should not kill the instance
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / kafka / InfoChannel.java
1 package de.juplo.kafka.chat.backend.implementation.kafka;
2
3 import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
4 import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException;
5 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
6 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
7 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardAssigned;
8 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardRevoked;
9 import lombok.Getter;
10 import lombok.extern.slf4j.Slf4j;
11 import org.apache.kafka.clients.consumer.Consumer;
12 import org.apache.kafka.clients.consumer.ConsumerRecord;
13 import org.apache.kafka.clients.consumer.ConsumerRecords;
14 import org.apache.kafka.clients.producer.Producer;
15 import org.apache.kafka.clients.producer.ProducerRecord;
16 import org.apache.kafka.clients.producer.RecordMetadata;
17 import org.apache.kafka.common.errors.WakeupException;
18 import reactor.core.publisher.Flux;
19 import reactor.core.publisher.Mono;
20
21 import java.net.URI;
22 import java.time.*;
23 import java.util.HashMap;
24 import java.util.Map;
25 import java.util.UUID;
26 import java.util.stream.IntStream;
27
28
29 @Slf4j
30 public class InfoChannel implements Runnable
31 {
32   private final String topic;
33   private final Producer<String, AbstractMessageTo> producer;
34   private final Consumer<String, AbstractMessageTo> consumer;
35   private final Duration pollingInterval;
36   private final int numShards;
37   private final String[] shardOwners;
38   private final long[] currentOffset;
39   private final long[] nextOffset;
40   private final Map<UUID, ChatRoomInfo> chatRoomInfo;
41   private final String instanceUri;
42
43   private boolean running;
44   @Getter
45   private volatile boolean loadInProgress = true;
46
47
48   public InfoChannel(
49     String topic,
50     Producer<String, AbstractMessageTo> producer,
51     Consumer<String, AbstractMessageTo> infoChannelConsumer,
52     Duration pollingInterval,
53     int numShards,
54     URI instanceUri)
55   {
56     log.debug(
57         "Creating InfoChannel for topic {}",
58         topic);
59     this.topic = topic;
60     this.consumer = infoChannelConsumer;
61     this.producer = producer;
62     this.chatRoomInfo = new HashMap<>();
63
64     this.pollingInterval = pollingInterval;
65
66     this.numShards = numShards;
67     this.shardOwners = new String[numShards];
68     this.currentOffset = new long[numShards];
69     this.nextOffset = new long[numShards];
70     IntStream
71         .range(0, numShards)
72         .forEach(partition -> this.nextOffset[partition] = -1l);
73
74     this.instanceUri = instanceUri.toASCIIString();
75   }
76
77
78   Mono<ChatRoomInfo> sendChatRoomCreatedEvent(
79       UUID chatRoomId,
80       String name,
81       int shard)
82   {
83     EventChatRoomCreated to = EventChatRoomCreated.of(chatRoomId, name, shard);
84     return Mono.create(sink ->
85     {
86       ProducerRecord<String, AbstractMessageTo> record =
87           new ProducerRecord<>(
88               topic,
89               Integer.toString(shard),
90               to);
91
92       producer.send(record, ((metadata, exception) ->
93       {
94         if (exception == null)
95         {
96           log.info("Successfully sent created event for chat chat-room: {}", to);
97           ChatRoomInfo chatRoomInfo = new ChatRoomInfo(chatRoomId, name, shard);
98           sink.success(chatRoomInfo);
99         }
100         else
101         {
102           // On send-failure
103           log.error(
104               "Could not send created event for chat-room (id={}, name={}): {}",
105               chatRoomId,
106               name,
107               exception);
108           sink.error(exception);
109         }
110       }));
111     });
112   }
113
114   void sendShardAssignedEvent(int shard)
115   {
116     EventShardAssigned to = EventShardAssigned.of(shard, instanceUri);
117
118     ProducerRecord<String, AbstractMessageTo> record =
119         new ProducerRecord<>(
120             topic,
121             Integer.toString(shard),
122             to);
123
124     producer.send(record, ((metadata, exception) ->
125     {
126       if (metadata != null)
127       {
128         log.info("Successfully sent shard assigned event for shard: {}", shard);
129       }
130       else
131       {
132         // On send-failure
133         log.error(
134             "Could not send shard assigned event for shard {}: {}",
135             shard,
136             exception);
137         // TODO:
138         // Verhalten im Fehlerfall durchdenken!
139         // Z.B.: unsubscribe() und darauf folgendes (re-)subscribe() des
140         // Consumers veranlassen, so dass die nicht öffentlich Bekannte
141         // Zuständigkeit abgegeben und neu zugeordnet wird?
142         // Falls der Weg gegangen wird: Achtung wegen Sticke Partitions!
143       }
144     }));
145   }
146
147   void sendShardRevokedEvent(int shard)
148   {
149     EventShardRevoked to = EventShardRevoked.of(shard, instanceUri);
150
151     ProducerRecord<String, AbstractMessageTo> record =
152         new ProducerRecord<>(
153             topic,
154             Integer.toString(shard),
155             to);
156
157     producer.send(record, ((metadata, exception) ->
158     {
159       if (metadata != null)
160       {
161         log.info("Successfully sent shard revoked event for shard: {}", shard);
162       }
163       else
164       {
165         // On send-failure
166         log.error(
167             "Could not send shard revoked event for shard {}: {}",
168             shard,
169             exception);
170         // TODO:
171         // Verhalten im Fehlerfall durchdenken!
172         // Ggf. einfach egal, da die neue zuständige Instanz den
173         // nicht gelöschten Eintrag eh überschreibt?
174       }
175     }));
176   }
177
178
179   @Override
180   public void run()
181   {
182     running = true;
183
184     consumer
185         .endOffsets(consumer.assignment())
186         .entrySet()
187         .stream()
188         .forEach(entry -> this.currentOffset[entry.getKey().partition()] = entry.getValue());
189     IntStream
190         .range(0, numShards)
191         .forEach(partition -> this.nextOffset[partition] = 0l);
192     loadInProgress = true;
193
194     while (running)
195     {
196       try
197       {
198         ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(pollingInterval);
199         log.debug("Fetched {} messages", records.count());
200         for (ConsumerRecord<String, AbstractMessageTo> record : records)
201         {
202           handleMessage(record);
203           updateNextOffset(record.partition(), record.offset() + 1);
204         }
205       }
206       catch (WakeupException e)
207       {
208         log.info("Received WakeupException, exiting!");
209         running = false;
210       }
211     }
212
213     log.info("Exiting normally");
214   }
215
216   private void updateNextOffset(int partition, long nextOffset)
217   {
218     this.nextOffset[partition] = nextOffset;
219     if (loadInProgress) {
220       loadInProgress = IntStream
221           .range(0, numShards)
222           .anyMatch(shard -> this.nextOffset[shard] < currentOffset[partition]);
223     }
224   }
225
226   private void handleMessage(ConsumerRecord<String, AbstractMessageTo> record)
227   {
228     switch (record.value().getType())
229     {
230       case EVENT_CHATROOM_CREATED:
231         EventChatRoomCreated eventChatRoomCreated =
232             (EventChatRoomCreated) record.value();
233         createChatRoom(eventChatRoomCreated.toChatRoomInfo());
234         break;
235
236       case EVENT_SHARD_ASSIGNED:
237         EventShardAssigned eventShardAssigned =
238             (EventShardAssigned) record.value();
239         log.info(
240             "Shard {} was assigned to {}",
241             eventShardAssigned.getShard(),
242             eventShardAssigned.getUri());
243         shardOwners[eventShardAssigned.getShard()] = eventShardAssigned.getUri();
244         break;
245
246       case EVENT_SHARD_REVOKED:
247         EventShardRevoked eventShardRevoked =
248             (EventShardRevoked) record.value();
249         log.info(
250             "Shard {} was revoked from {}",
251             eventShardRevoked.getShard(),
252             eventShardRevoked.getUri());
253         shardOwners[eventShardRevoked.getShard()] = null;
254         break;
255
256       default:
257         log.debug(
258             "Ignoring message for key={} with offset={}: {}",
259             record.key(),
260             record.offset(),
261             record.value());
262     }
263   }
264
265   private void createChatRoom(ChatRoomInfo chatRoomInfo)
266   {
267     UUID chatRoomId = chatRoomInfo.getId();
268     Integer partition = chatRoomInfo.getShard();
269
270     if (this.chatRoomInfo.containsKey(chatRoomId))
271     {
272       log.warn(
273           "Ignoring existing chat-room for {}: {}",
274           partition,
275           chatRoomId);
276     }
277     else
278     {
279       log.info(
280           "Adding new chat-room for partition {}: {}",
281           partition,
282           chatRoomId);
283
284       this.chatRoomInfo.put(chatRoomId, chatRoomInfo);
285     }
286   }
287
288   Flux<ChatRoomInfo> getChatRoomInfo()
289   {
290     return Flux.fromIterable(chatRoomInfo.values());
291   }
292
293   Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
294   {
295     if (loadInProgress)
296     {
297       return Mono.error(new LoadInProgressException());
298     }
299
300     return Mono.fromSupplier(() -> chatRoomInfo.get(id));
301   }
302
303   Mono<String[]> getShardOwners()
304   {
305     return Mono.just(shardOwners);
306   }
307 }