WIP: shard assigned/revoked events
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / kafka / InfoChannel.java
1 package de.juplo.kafka.chat.backend.implementation.kafka;
2
3 import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
4 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
5 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
6 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardAssigned;
7 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardRevoked;
8 import lombok.extern.slf4j.Slf4j;
9 import org.apache.kafka.clients.consumer.Consumer;
10 import org.apache.kafka.clients.consumer.ConsumerRecord;
11 import org.apache.kafka.clients.consumer.ConsumerRecords;
12 import org.apache.kafka.clients.producer.Producer;
13 import org.apache.kafka.clients.producer.ProducerRecord;
14 import org.apache.kafka.clients.producer.RecordMetadata;
15 import org.apache.kafka.common.errors.WakeupException;
16 import reactor.core.publisher.Flux;
17 import reactor.core.publisher.Mono;
18
19 import java.net.URI;
20 import java.time.*;
21 import java.util.HashMap;
22 import java.util.Map;
23 import java.util.UUID;
24 import java.util.stream.IntStream;
25
26
27 @Slf4j
28 public class InfoChannel implements Runnable
29 {
30   private final String topic;
31   private final Producer<String, AbstractMessageTo> producer;
32   private final Consumer<String, AbstractMessageTo> consumer;
33   private final int numShards;
34   private final String[] shardOwners;
35   private final long[] currentOffset;
36   private final long[] nextOffset;
37   private final Map<UUID, ChatRoomInfo> chatRoomInfo;
38   private final String instanceUri;
39
40   private boolean running;
41
42
43   public InfoChannel(
44     String topic,
45     Producer<String, AbstractMessageTo> producer,
46     Consumer<String, AbstractMessageTo> infoChannelConsumer,
47     URI instanceUri)
48   {
49     log.debug(
50         "Creating InfoChannel for topic {}",
51         topic);
52     this.topic = topic;
53     this.consumer = infoChannelConsumer;
54     this.producer = producer;
55     this.chatRoomInfo = new HashMap<>();
56
57     this.numShards = consumer
58         .partitionsFor(topic)
59         .size();
60     this.shardOwners = new String[numShards];
61     this.currentOffset = new long[numShards];
62     this.nextOffset = new long[numShards];
63     IntStream
64         .range(0, numShards)
65         .forEach(partition -> this.nextOffset[partition] = -1l);
66
67     this.instanceUri = instanceUri.toASCIIString();
68   }
69
70
71   boolean loadInProgress()
72   {
73     return IntStream
74         .range(0, numShards)
75         .anyMatch(partition -> nextOffset[partition] < currentOffset[partition]);
76   }
77
78   Mono<ChatRoomInfo> sendChatRoomCreatedEvent(
79       UUID chatRoomId,
80       String name,
81       int shard)
82   {
83     EventChatRoomCreated to = EventChatRoomCreated.of(chatRoomId, name, shard);
84     return Mono.create(sink ->
85     {
86       ProducerRecord<String, AbstractMessageTo> record =
87           new ProducerRecord<>(
88               topic,
89               Integer.toString(shard),
90               to);
91
92       producer.send(record, ((metadata, exception) ->
93       {
94         if (exception == null)
95         {
96           log.info("Successfully sent created event for chat chat-room: {}", to);
97           ChatRoomInfo chatRoomInfo = new ChatRoomInfo(chatRoomId, name, shard);
98           sink.success(chatRoomInfo);
99         }
100         else
101         {
102           // On send-failure
103           log.error(
104               "Could not send created event for chat-room (id={}, name={}): {}",
105               chatRoomId,
106               name,
107               exception);
108           sink.error(exception);
109         }
110       }));
111     });
112   }
113
114   void sendShardAssignedEvent(int shard)
115   {
116     EventShardAssigned to = EventShardAssigned.of(shard, instanceUri);
117
118     ProducerRecord<String, AbstractMessageTo> record =
119         new ProducerRecord<>(
120             topic,
121             Integer.toString(shard),
122             to);
123
124     producer.send(record, ((metadata, exception) ->
125     {
126       if (metadata != null)
127       {
128         log.info("Successfully sent shard assigned event for shard: {}", shard);
129       }
130       else
131       {
132         // On send-failure
133         log.error(
134             "Could not send shard assigned event for shard {}: {}",
135             shard,
136             exception);
137       }
138     }));
139   }
140
141   void sendShardRevokedEvent(int shard)
142   {
143     EventShardRevoked to = EventShardRevoked.of(shard, instanceUri);
144
145     ProducerRecord<String, AbstractMessageTo> record =
146         new ProducerRecord<>(
147             topic,
148             Integer.toString(shard),
149             to);
150
151     producer.send(record, ((metadata, exception) ->
152     {
153       if (metadata != null)
154       {
155         log.info("Successfully sent shard revoked event for shard: {}", shard);
156       }
157       else
158       {
159         // On send-failure
160         log.error(
161             "Could not send shard revoked event for shard {}: {}",
162             shard,
163             exception);
164       }
165     }));
166   }
167
168
169   @Override
170   public void run()
171   {
172     running = true;
173
174     consumer
175         .endOffsets(consumer.assignment())
176         .entrySet()
177         .stream()
178         .forEach(entry -> this.currentOffset[entry.getKey().partition()] = entry.getValue());
179     IntStream
180         .range(0, numShards)
181         .forEach(partition -> this.nextOffset[partition] = 0l);
182
183     while (running)
184     {
185       try
186       {
187         ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(Duration.ofMinutes(1));
188         log.debug("Fetched {} messages", records.count());
189         handleMessages(records);
190       }
191       catch (WakeupException e)
192       {
193         log.info("Received WakeupException, exiting!");
194         running = false;
195       }
196     }
197
198     log.info("Exiting normally");
199   }
200
201   private void handleMessages(ConsumerRecords<String, AbstractMessageTo> records)
202   {
203     for (ConsumerRecord<String, AbstractMessageTo> record : records)
204     {
205       switch (record.value().getType())
206       {
207         case EVENT_CHATROOM_CREATED:
208           EventChatRoomCreated eventChatRoomCreated =
209               (EventChatRoomCreated) record.value();
210           createChatRoom(eventChatRoomCreated.toChatRoomInfo());
211           break;
212
213         case EVENT_SHARD_ASSIGNED:
214           EventShardAssigned eventShardAssigned =
215               (EventShardAssigned) record.value();
216           log.info(
217               "Shard {} was assigned to {}",
218               eventShardAssigned.getShard(),
219               eventShardAssigned.getUri());
220           shardOwners[eventShardAssigned.getShard()] = eventShardAssigned.getUri();
221           break;
222
223         case EVENT_SHARD_REVOKED:
224           EventShardRevoked eventShardRevoked =
225               (EventShardRevoked) record.value();
226           log.info(
227               "Shard {} was revoked from {}",
228               eventShardRevoked.getShard(),
229               eventShardRevoked.getUri());
230           shardOwners[eventShardRevoked.getShard()] = null;
231           break;
232
233         default:
234           log.debug(
235               "Ignoring message for key={} with offset={}: {}",
236               record.key(),
237               record.offset(),
238               record.value());
239       }
240
241       nextOffset[record.partition()] = record.offset() + 1;
242     }
243   }
244
245   private void createChatRoom(ChatRoomInfo chatRoomInfo)
246   {
247     UUID chatRoomId = chatRoomInfo.getId();
248     Integer partition = chatRoomInfo.getShard();
249
250     if (this.chatRoomInfo.containsKey(chatRoomId))
251     {
252       log.warn(
253           "Ignoring existing chat-room for {}: {}",
254           partition,
255           chatRoomId);
256     }
257     else
258     {
259       log.info(
260           "Adding new chat-room for partition {}: {}",
261           partition,
262           chatRoomId);
263
264       this.chatRoomInfo.put(chatRoomId, chatRoomInfo);
265     }
266   }
267
268   Flux<ChatRoomInfo> getChatRoomInfo()
269   {
270     return Flux.fromIterable(chatRoomInfo.values());
271   }
272
273   Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
274   {
275     return Mono.fromSupplier(() -> chatRoomInfo.get(id));
276   }
277
278   Mono<String[]> getShardOwners()
279   {
280     return Mono.just(shardOwners);
281   }
282 }