refactor: compute `loadInProgress` on offset-change
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / kafka / InfoChannel.java
1 package de.juplo.kafka.chat.backend.implementation.kafka;
2
3 import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
4 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
5 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
6 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardAssigned;
7 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardRevoked;
8 import lombok.Getter;
9 import lombok.extern.slf4j.Slf4j;
10 import org.apache.kafka.clients.consumer.Consumer;
11 import org.apache.kafka.clients.consumer.ConsumerRecord;
12 import org.apache.kafka.clients.consumer.ConsumerRecords;
13 import org.apache.kafka.clients.producer.Producer;
14 import org.apache.kafka.clients.producer.ProducerRecord;
15 import org.apache.kafka.clients.producer.RecordMetadata;
16 import org.apache.kafka.common.errors.WakeupException;
17 import reactor.core.publisher.Flux;
18 import reactor.core.publisher.Mono;
19
20 import java.net.URI;
21 import java.time.*;
22 import java.util.HashMap;
23 import java.util.Map;
24 import java.util.UUID;
25 import java.util.stream.IntStream;
26
27
28 @Slf4j
29 public class InfoChannel implements Runnable
30 {
31   private final String topic;
32   private final Producer<String, AbstractMessageTo> producer;
33   private final Consumer<String, AbstractMessageTo> consumer;
34   private final int numShards;
35   private final String[] shardOwners;
36   private final long[] currentOffset;
37   private final long[] nextOffset;
38   private final Map<UUID, ChatRoomInfo> chatRoomInfo;
39   private final String instanceUri;
40
41   private boolean running;
42   @Getter
43   private volatile boolean loadInProgress = true;
44
45
46   public InfoChannel(
47     String topic,
48     Producer<String, AbstractMessageTo> producer,
49     Consumer<String, AbstractMessageTo> infoChannelConsumer,
50     URI instanceUri)
51   {
52     log.debug(
53         "Creating InfoChannel for topic {}",
54         topic);
55     this.topic = topic;
56     this.consumer = infoChannelConsumer;
57     this.producer = producer;
58     this.chatRoomInfo = new HashMap<>();
59
60     this.numShards = consumer
61         .partitionsFor(topic)
62         .size();
63     this.shardOwners = new String[numShards];
64     this.currentOffset = new long[numShards];
65     this.nextOffset = new long[numShards];
66     IntStream
67         .range(0, numShards)
68         .forEach(partition -> this.nextOffset[partition] = -1l);
69
70     this.instanceUri = instanceUri.toASCIIString();
71   }
72
73
74   Mono<ChatRoomInfo> sendChatRoomCreatedEvent(
75       UUID chatRoomId,
76       String name,
77       int shard)
78   {
79     EventChatRoomCreated to = EventChatRoomCreated.of(chatRoomId, name, shard);
80     return Mono.create(sink ->
81     {
82       ProducerRecord<String, AbstractMessageTo> record =
83           new ProducerRecord<>(
84               topic,
85               Integer.toString(shard),
86               to);
87
88       producer.send(record, ((metadata, exception) ->
89       {
90         if (exception == null)
91         {
92           log.info("Successfully sent created event for chat chat-room: {}", to);
93           ChatRoomInfo chatRoomInfo = new ChatRoomInfo(chatRoomId, name, shard);
94           sink.success(chatRoomInfo);
95         }
96         else
97         {
98           // On send-failure
99           log.error(
100               "Could not send created event for chat-room (id={}, name={}): {}",
101               chatRoomId,
102               name,
103               exception);
104           sink.error(exception);
105         }
106       }));
107     });
108   }
109
110   void sendShardAssignedEvent(int shard)
111   {
112     EventShardAssigned to = EventShardAssigned.of(shard, instanceUri);
113
114     ProducerRecord<String, AbstractMessageTo> record =
115         new ProducerRecord<>(
116             topic,
117             Integer.toString(shard),
118             to);
119
120     producer.send(record, ((metadata, exception) ->
121     {
122       if (metadata != null)
123       {
124         log.info("Successfully sent shard assigned event for shard: {}", shard);
125       }
126       else
127       {
128         // On send-failure
129         log.error(
130             "Could not send shard assigned event for shard {}: {}",
131             shard,
132             exception);
133         // TODO:
134         // Verhalten im Fehlerfall durchdenken!
135         // Z.B.: unsubscribe() und darauf folgendes (re-)subscribe() des
136         // Consumers veranlassen, so dass die nicht öffentlich Bekannte
137         // Zuständigkeit abgegeben und neu zugeordnet wird?
138         // Falls der Weg gegangen wird: Achtung wegen Sticke Partitions!
139       }
140     }));
141   }
142
143   void sendShardRevokedEvent(int shard)
144   {
145     EventShardRevoked to = EventShardRevoked.of(shard, instanceUri);
146
147     ProducerRecord<String, AbstractMessageTo> record =
148         new ProducerRecord<>(
149             topic,
150             Integer.toString(shard),
151             to);
152
153     producer.send(record, ((metadata, exception) ->
154     {
155       if (metadata != null)
156       {
157         log.info("Successfully sent shard revoked event for shard: {}", shard);
158       }
159       else
160       {
161         // On send-failure
162         log.error(
163             "Could not send shard revoked event for shard {}: {}",
164             shard,
165             exception);
166         // TODO:
167         // Verhalten im Fehlerfall durchdenken!
168         // Ggf. einfach egal, da die neue zuständige Instanz den
169         // nicht gelöschten Eintrag eh überschreibt?
170       }
171     }));
172   }
173
174
175   @Override
176   public void run()
177   {
178     running = true;
179
180     consumer
181         .endOffsets(consumer.assignment())
182         .entrySet()
183         .stream()
184         .forEach(entry -> this.currentOffset[entry.getKey().partition()] = entry.getValue());
185     IntStream
186         .range(0, numShards)
187         .forEach(partition -> this.nextOffset[partition] = 0l);
188     loadInProgress = true;
189
190     while (running)
191     {
192       try
193       {
194         ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(Duration.ofMinutes(1));
195         log.debug("Fetched {} messages", records.count());
196         for (ConsumerRecord<String, AbstractMessageTo> record : records)
197         {
198           handleMessage(record);
199           updateNextOffset(record.partition(), record.offset() + 1);
200         }
201       }
202       catch (WakeupException e)
203       {
204         log.info("Received WakeupException, exiting!");
205         running = false;
206       }
207     }
208
209     log.info("Exiting normally");
210   }
211
212   private void updateNextOffset(int partition, long nextOffset)
213   {
214     this.nextOffset[partition] = nextOffset;
215     if (loadInProgress) {
216       loadInProgress = IntStream
217           .range(0, numShards)
218           .anyMatch(shard -> this.nextOffset[shard] < currentOffset[partition]);
219     }
220   }
221
222   private void handleMessage(ConsumerRecord<String, AbstractMessageTo> record)
223   {
224     switch (record.value().getType())
225     {
226       case EVENT_CHATROOM_CREATED:
227         EventChatRoomCreated eventChatRoomCreated =
228             (EventChatRoomCreated) record.value();
229         createChatRoom(eventChatRoomCreated.toChatRoomInfo());
230         break;
231
232       case EVENT_SHARD_ASSIGNED:
233         EventShardAssigned eventShardAssigned =
234             (EventShardAssigned) record.value();
235         log.info(
236             "Shard {} was assigned to {}",
237             eventShardAssigned.getShard(),
238             eventShardAssigned.getUri());
239         shardOwners[eventShardAssigned.getShard()] = eventShardAssigned.getUri();
240         break;
241
242       case EVENT_SHARD_REVOKED:
243         EventShardRevoked eventShardRevoked =
244             (EventShardRevoked) record.value();
245         log.info(
246             "Shard {} was revoked from {}",
247             eventShardRevoked.getShard(),
248             eventShardRevoked.getUri());
249         shardOwners[eventShardRevoked.getShard()] = null;
250         break;
251
252       default:
253         log.debug(
254             "Ignoring message for key={} with offset={}: {}",
255             record.key(),
256             record.offset(),
257             record.value());
258     }
259   }
260
261   private void createChatRoom(ChatRoomInfo chatRoomInfo)
262   {
263     UUID chatRoomId = chatRoomInfo.getId();
264     Integer partition = chatRoomInfo.getShard();
265
266     if (this.chatRoomInfo.containsKey(chatRoomId))
267     {
268       log.warn(
269           "Ignoring existing chat-room for {}: {}",
270           partition,
271           chatRoomId);
272     }
273     else
274     {
275       log.info(
276           "Adding new chat-room for partition {}: {}",
277           partition,
278           chatRoomId);
279
280       this.chatRoomInfo.put(chatRoomId, chatRoomInfo);
281     }
282   }
283
284   Flux<ChatRoomInfo> getChatRoomInfo()
285   {
286     return Flux.fromIterable(chatRoomInfo.values());
287   }
288
289   Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
290   {
291     return Mono.fromSupplier(() -> chatRoomInfo.get(id));
292   }
293
294   Mono<String[]> getShardOwners()
295   {
296     return Mono.just(shardOwners);
297   }
298 }