WIP: shard assigned/revoked events
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / kafka / InfoChannel.java
1 package de.juplo.kafka.chat.backend.implementation.kafka;
2
3 import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
4 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
5 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
6 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardAssigned;
7 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardRevoked;
8 import lombok.extern.slf4j.Slf4j;
9 import org.apache.kafka.clients.consumer.Consumer;
10 import org.apache.kafka.clients.consumer.ConsumerRecord;
11 import org.apache.kafka.clients.consumer.ConsumerRecords;
12 import org.apache.kafka.clients.producer.Producer;
13 import org.apache.kafka.clients.producer.ProducerRecord;
14 import org.apache.kafka.clients.producer.RecordMetadata;
15 import org.apache.kafka.common.errors.WakeupException;
16 import reactor.core.publisher.Flux;
17 import reactor.core.publisher.Mono;
18
19 import java.net.URI;
20 import java.time.*;
21 import java.util.HashMap;
22 import java.util.Map;
23 import java.util.UUID;
24 import java.util.stream.IntStream;
25
26
27 @Slf4j
28 public class InfoChannel implements Runnable
29 {
30   private final String topic;
31   private final Producer<String, AbstractMessageTo> producer;
32   private final Consumer<String, AbstractMessageTo> consumer;
33   private final int numShards;
34   private final String[] shardOwners;
35   private final long[] currentOffset;
36   private final long[] nextOffset;
37   private final Map<UUID, ChatRoomInfo> chatRoomInfo;
38   private final String instanceUri;
39
40   private boolean running;
41
42
43   public InfoChannel(
44     String topic,
45     Producer<String, AbstractMessageTo> producer,
46     Consumer<String, AbstractMessageTo> infoChannelConsumer,
47     URI instanceUri)
48   {
49     log.debug(
50         "Creating InfoChannel for topic {}",
51         topic);
52     this.topic = topic;
53     this.consumer = infoChannelConsumer;
54     this.producer = producer;
55     this.chatRoomInfo = new HashMap<>();
56
57     this.numShards = consumer
58         .partitionsFor(topic)
59         .size();
60     this.shardOwners = new String[numShards];
61     this.currentOffset = new long[numShards];
62     this.nextOffset = new long[numShards];
63     IntStream
64         .range(0, numShards)
65         .forEach(partition -> this.nextOffset[partition] = -1l);
66
67     this.instanceUri = instanceUri.toASCIIString();
68   }
69
70
71   boolean loadInProgress()
72   {
73     return IntStream
74         .range(0, numShards)
75         .anyMatch(partition -> nextOffset[partition] < currentOffset[partition]);
76   }
77
78   Mono<ChatRoomInfo> sendChatRoomCreatedEvent(
79       UUID chatRoomId,
80       String name,
81       int shard)
82   {
83     EventChatRoomCreated to = EventChatRoomCreated.of(chatRoomId, name, shard);
84     return Mono.create(sink ->
85     {
86       ProducerRecord<String, AbstractMessageTo> record =
87           new ProducerRecord<>(
88               topic,
89               Integer.toString(shard),
90               to);
91
92       producer.send(record, ((metadata, exception) ->
93       {
94         if (metadata != null)
95         {
96           log.info("Successfully sent created event for chat chat-room: {}", to);
97           ChatRoomInfo chatRoomInfo = new ChatRoomInfo(chatRoomId, name, shard);
98           sink.success(chatRoomInfo);
99         }
100         else
101         {
102           // On send-failure
103           log.error(
104               "Could not send created event for chat-room (id={}, name={}): {}",
105               chatRoomId,
106               name,
107               exception);
108           sink.error(exception);
109         }
110       }));
111     });
112   }
113
114   Mono<RecordMetadata> sendShardAssignedEvent(int shard)
115   {
116     EventShardAssigned to = EventShardAssigned.of(shard, instanceUri);
117
118     return Mono.create(sink ->
119     {
120       ProducerRecord<String, AbstractMessageTo> record =
121           new ProducerRecord<>(
122               topic,
123               Integer.toString(shard),
124               to);
125
126       producer.send(record, ((metadata, exception) ->
127       {
128         if (metadata != null)
129         {
130           log.info("Successfully sent shard assigned event for shard: {}", shard);
131           sink.success(metadata);
132         }
133         else
134         {
135           // On send-failure
136           log.error(
137               "Could not send shard assigned event for shard {}: {}",
138               shard,
139               exception);
140           sink.error(exception);
141         }
142       }));
143     });
144   }
145
146   Mono<RecordMetadata> sendShardRevokedEvent(int shard)
147   {
148     EventShardRevoked to = EventShardRevoked.of(shard, instanceUri);
149
150     return Mono.create(sink ->
151     {
152       ProducerRecord<String, AbstractMessageTo> record =
153           new ProducerRecord<>(
154               topic,
155               Integer.toString(shard),
156               to);
157
158       producer.send(record, ((metadata, exception) ->
159       {
160         if (metadata != null)
161         {
162           log.info("Successfully sent shard revoked event for shard: {}", shard);
163           sink.success(metadata);
164         }
165         else
166         {
167           // On send-failure
168           log.error(
169               "Could not send shard revoked event for shard {}: {}",
170               shard,
171               exception);
172           sink.error(exception);
173         }
174       }));
175     });
176   }
177
178
179   @Override
180   public void run()
181   {
182     running = true;
183
184     consumer
185         .endOffsets(consumer.assignment())
186         .entrySet()
187         .stream()
188         .forEach(entry -> this.currentOffset[entry.getKey().partition()] = entry.getValue());
189     IntStream
190         .range(0, numShards)
191         .forEach(partition -> this.nextOffset[partition] = 0l);
192
193     while (running)
194     {
195       try
196       {
197         ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(Duration.ofMinutes(1));
198         log.debug("Fetched {} messages", records.count());
199         handleMessages(records);
200       }
201       catch (WakeupException e)
202       {
203         log.info("Received WakeupException, exiting!");
204         running = false;
205       }
206     }
207
208     log.info("Exiting normally");
209   }
210
211   private void handleMessages(ConsumerRecords<String, AbstractMessageTo> records)
212   {
213     for (ConsumerRecord<String, AbstractMessageTo> record : records)
214     {
215       switch (record.value().getType())
216       {
217         case EVENT_CHATROOM_CREATED:
218           EventChatRoomCreated eventChatRoomCreated =
219               (EventChatRoomCreated) record.value();
220           createChatRoom(eventChatRoomCreated.toChatRoomInfo());
221           break;
222
223         case EVENT_SHARD_ASSIGNED:
224           EventShardAssigned eventShardAssigned =
225               (EventShardAssigned) record.value();
226           log.info(
227               "Shard {} was assigned to {}",
228               eventShardAssigned.getShard(),
229               eventShardAssigned.getUri());
230           shardOwners[eventShardAssigned.getShard()] = eventShardAssigned.getUri();
231           break;
232
233         case EVENT_SHARD_REVOKED:
234           EventShardRevoked eventShardRevoked =
235               (EventShardRevoked) record.value();
236           log.info(
237               "Shard {} was revoked from {}",
238               eventShardRevoked.getShard(),
239               eventShardRevoked.getUri());
240           shardOwners[eventShardRevoked.getShard()] = null;
241           break;
242
243         default:
244           log.debug(
245               "Ignoring message for key={} with offset={}: {}",
246               record.key(),
247               record.offset(),
248               record.value());
249       }
250
251       nextOffset[record.partition()] = record.offset() + 1;
252     }
253   }
254
255   private void createChatRoom(ChatRoomInfo chatRoomInfo)
256   {
257     UUID chatRoomId = chatRoomInfo.getId();
258     Integer partition = chatRoomInfo.getShard();
259
260     if (this.chatRoomInfo.containsKey(chatRoomId))
261     {
262       log.warn(
263           "Ignoring existing chat-room for {}: {}",
264           partition,
265           chatRoomId);
266     }
267     else
268     {
269       log.info(
270           "Adding new chat-room for partition {}: {}",
271           partition,
272           chatRoomId);
273
274       this.chatRoomInfo.put(chatRoomId, chatRoomInfo);
275     }
276   }
277
278   Flux<ChatRoomInfo> getChatRoomInfo()
279   {
280     return Flux.fromIterable(chatRoomInfo.values());
281   }
282
283   Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
284   {
285     return Mono.fromSupplier(() -> chatRoomInfo.get(id));
286   }
287
288   Mono<String[]> getShardOwners()
289   {
290     return Mono.just(shardOwners);
291   }
292 }