f08a8d853c8bd0de3f53ec5206c601d263adea44
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / kafka / InfoChannel.java
1 package de.juplo.kafka.chat.backend.implementation.kafka;
2
3 import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
4 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
5 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
6 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardAssigned;
7 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardRevoked;
8 import lombok.extern.slf4j.Slf4j;
9 import org.apache.kafka.clients.consumer.Consumer;
10 import org.apache.kafka.clients.consumer.ConsumerRecord;
11 import org.apache.kafka.clients.consumer.ConsumerRecords;
12 import org.apache.kafka.clients.producer.Producer;
13 import org.apache.kafka.clients.producer.ProducerRecord;
14 import org.apache.kafka.clients.producer.RecordMetadata;
15 import org.apache.kafka.common.errors.WakeupException;
16 import reactor.core.publisher.Flux;
17 import reactor.core.publisher.Mono;
18 import reactor.core.publisher.Sinks;
19
20 import java.net.URI;
21 import java.time.*;
22 import java.util.HashMap;
23 import java.util.Map;
24 import java.util.UUID;
25 import java.util.stream.IntStream;
26
27
28 @Slf4j
29 public class InfoChannel implements Runnable
30 {
31   private final String topic;
32   private final Producer<String, AbstractMessageTo> producer;
33   private final Consumer<String, AbstractMessageTo> consumer;
34   private final int numShards;
35   private final String[] shardOwners;
36   private final long[] currentOffset;
37   private final long[] nextOffset;
38   private final Map<UUID, ChatRoomInfo> chatRoomInfo;
39   private final String instanceUri;
40
41   private boolean running;
42
43
44   public InfoChannel(
45     String topic,
46     Producer<String, AbstractMessageTo> producer,
47     Consumer<String, AbstractMessageTo> infoChannelConsumer,
48     URI instanceUri)
49   {
50     log.debug(
51         "Creating InfoChannel for topic {}",
52         topic);
53     this.topic = topic;
54     this.consumer = infoChannelConsumer;
55     this.producer = producer;
56     this.chatRoomInfo = new HashMap<>();
57
58     this.numShards = consumer
59         .partitionsFor(topic)
60         .size();
61     this.shardOwners = new String[numShards];
62     this.currentOffset = new long[numShards];
63     this.nextOffset = new long[numShards];
64     IntStream
65         .range(0, numShards)
66         .forEach(partition -> this.nextOffset[partition] = -1l);
67
68     this.instanceUri = instanceUri.toASCIIString();
69   }
70
71
72   boolean loadInProgress()
73   {
74     return IntStream
75         .range(0, numShards)
76         .anyMatch(partition -> nextOffset[partition] < currentOffset[partition]);
77   }
78
79   Mono<ChatRoomInfo> sendChatRoomCreatedEvent(
80       UUID chatRoomId,
81       String name,
82       int shard)
83   {
84     EventChatRoomCreated to = EventChatRoomCreated.of(chatRoomId, name, shard);
85     return Mono.create(sink ->
86     {
87       ProducerRecord<String, AbstractMessageTo> record =
88           new ProducerRecord<>(
89               topic,
90               Integer.toString(shard),
91               to);
92
93       producer.send(record, ((metadata, exception) ->
94       {
95         if (metadata != null)
96         {
97           log.info("Successfully sent created event for chat chat-room: {}", to);
98           ChatRoomInfo chatRoomInfo = new ChatRoomInfo(chatRoomId, name, shard);
99           sink.success(chatRoomInfo);
100         }
101         else
102         {
103           // On send-failure
104           log.error(
105               "Could not send created event for chat-room (id={}, name={}): {}",
106               chatRoomId,
107               name,
108               exception);
109           sink.error(exception);
110         }
111       }));
112     });
113   }
114
115   Mono<RecordMetadata> sendShardAssignedEvent(int shard)
116   {
117     EventShardAssigned to = EventShardAssigned.of(shard, instanceUri);
118
119     Sinks.One sink = Sinks.unsafe().one();
120
121     ProducerRecord<String, AbstractMessageTo> record =
122         new ProducerRecord<>(
123             topic,
124             Integer.toString(shard),
125             to);
126
127     producer.send(record, ((metadata, exception) ->
128     {
129       if (metadata != null)
130       {
131         log.info("Successfully sent shard assigned event for shard: {}", shard);
132         sink.emitValue(metadata, Sinks.EmitFailureHandler.FAIL_FAST);
133       }
134       else
135       {
136         // On send-failure
137         log.error(
138             "Could not send shard assigned event for shard {}: {}",
139             shard,
140             exception);
141         sink.emitError(exception, Sinks.EmitFailureHandler.FAIL_FAST);
142       }
143     }));
144
145     return sink.asMono();
146   }
147
148   Mono<RecordMetadata> sendShardRevokedEvent(int shard)
149   {
150     EventShardRevoked to = EventShardRevoked.of(shard, instanceUri);
151
152     Sinks.One sink = Sinks.unsafe().one();
153
154     ProducerRecord<String, AbstractMessageTo> record =
155         new ProducerRecord<>(
156             topic,
157             Integer.toString(shard),
158             to);
159
160     producer.send(record, ((metadata, exception) ->
161     {
162       if (metadata != null)
163       {
164         log.info("Successfully sent shard revoked event for shard: {}", shard);
165         sink.emitValue(metadata, Sinks.EmitFailureHandler.FAIL_FAST);
166       }
167       else
168       {
169         // On send-failure
170         log.error(
171             "Could not send shard revoked event for shard {}: {}",
172             shard,
173             exception);
174         sink.emitError(exception, Sinks.EmitFailureHandler.FAIL_FAST);
175       }
176     }));
177
178     return sink.asMono();
179   }
180
181
182   @Override
183   public void run()
184   {
185     running = true;
186
187     consumer
188         .endOffsets(consumer.assignment())
189         .entrySet()
190         .stream()
191         .forEach(entry -> this.currentOffset[entry.getKey().partition()] = entry.getValue());
192     IntStream
193         .range(0, numShards)
194         .forEach(partition -> this.nextOffset[partition] = 0l);
195
196     while (running)
197     {
198       try
199       {
200         ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(Duration.ofMinutes(1));
201         log.debug("Fetched {} messages", records.count());
202         handleMessages(records);
203       }
204       catch (WakeupException e)
205       {
206         log.info("Received WakeupException, exiting!");
207         running = false;
208       }
209     }
210
211     log.info("Exiting normally");
212   }
213
214   private void handleMessages(ConsumerRecords<String, AbstractMessageTo> records)
215   {
216     for (ConsumerRecord<String, AbstractMessageTo> record : records)
217     {
218       switch (record.value().getType())
219       {
220         case EVENT_CHATROOM_CREATED:
221           EventChatRoomCreated eventChatRoomCreated =
222               (EventChatRoomCreated) record.value();
223           createChatRoom(eventChatRoomCreated.toChatRoomInfo());
224           break;
225
226         case EVENT_SHARD_ASSIGNED:
227           EventShardAssigned eventShardAssigned =
228               (EventShardAssigned) record.value();
229           log.info(
230               "Shard {} was assigned to {}",
231               eventShardAssigned.getShard(),
232               eventShardAssigned.getUri());
233           shardOwners[eventShardAssigned.getShard()] = eventShardAssigned.getUri();
234           break;
235
236         case EVENT_SHARD_REVOKED:
237           EventShardRevoked eventShardRevoked =
238               (EventShardRevoked) record.value();
239           log.info(
240               "Shard {} was revoked from {}",
241               eventShardRevoked.getShard(),
242               eventShardRevoked.getUri());
243           shardOwners[eventShardRevoked.getShard()] = null;
244           break;
245
246         default:
247           log.debug(
248               "Ignoring message for key={} with offset={}: {}",
249               record.key(),
250               record.offset(),
251               record.value());
252       }
253
254       nextOffset[record.partition()] = record.offset() + 1;
255     }
256   }
257
258   private void createChatRoom(ChatRoomInfo chatRoomInfo)
259   {
260     UUID chatRoomId = chatRoomInfo.getId();
261     Integer partition = chatRoomInfo.getShard();
262
263     if (this.chatRoomInfo.containsKey(chatRoomId))
264     {
265       log.warn(
266           "Ignoring existing chat-room for {}: {}",
267           partition,
268           chatRoomId);
269     }
270     else
271     {
272       log.info(
273           "Adding new chat-room for partition {}: {}",
274           partition,
275           chatRoomId);
276
277       this.chatRoomInfo.put(chatRoomId, chatRoomInfo);
278     }
279   }
280
281   Flux<ChatRoomInfo> getChatRoomInfo()
282   {
283     return Flux.fromIterable(chatRoomInfo.values());
284   }
285
286   Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
287   {
288     return Mono.fromSupplier(() -> chatRoomInfo.get(id));
289   }
290
291   Mono<String[]> getShardOwners()
292   {
293     return Mono.just(shardOwners);
294   }
295 }