1 package de.juplo.kafka.chat.backend.implementation.kafka;
3 import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
4 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
5 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
6 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardAssigned;
7 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardRevoked;
8 import lombok.extern.slf4j.Slf4j;
9 import org.apache.kafka.clients.consumer.Consumer;
10 import org.apache.kafka.clients.consumer.ConsumerRecord;
11 import org.apache.kafka.clients.consumer.ConsumerRecords;
12 import org.apache.kafka.clients.producer.Producer;
13 import org.apache.kafka.clients.producer.ProducerRecord;
14 import org.apache.kafka.clients.producer.RecordMetadata;
15 import org.apache.kafka.common.errors.WakeupException;
16 import reactor.core.publisher.Flux;
17 import reactor.core.publisher.Mono;
18 import reactor.core.publisher.Sinks;
22 import java.util.HashMap;
24 import java.util.UUID;
25 import java.util.stream.IntStream;
29 public class InfoChannel implements Runnable
31 private final String topic;
32 private final Producer<String, AbstractMessageTo> producer;
33 private final Consumer<String, AbstractMessageTo> consumer;
34 private final int numShards;
35 private final String[] shardOwners;
36 private final long[] currentOffset;
37 private final long[] nextOffset;
38 private final Map<UUID, ChatRoomInfo> chatRoomInfo;
39 private final String instanceUri;
41 private boolean running;
46 Producer<String, AbstractMessageTo> producer,
47 Consumer<String, AbstractMessageTo> infoChannelConsumer,
51 "Creating InfoChannel for topic {}",
54 this.consumer = infoChannelConsumer;
55 this.producer = producer;
56 this.chatRoomInfo = new HashMap<>();
58 this.numShards = consumer
61 this.shardOwners = new String[numShards];
62 this.currentOffset = new long[numShards];
63 this.nextOffset = new long[numShards];
66 .forEach(partition -> this.nextOffset[partition] = -1l);
68 this.instanceUri = instanceUri.toASCIIString();
72 boolean loadInProgress()
76 .anyMatch(partition -> nextOffset[partition] < currentOffset[partition]);
79 Mono<ChatRoomInfo> sendChatRoomCreatedEvent(
84 EventChatRoomCreated to = EventChatRoomCreated.of(chatRoomId, name, shard);
85 return Mono.create(sink ->
87 ProducerRecord<String, AbstractMessageTo> record =
90 Integer.toString(shard),
93 producer.send(record, ((metadata, exception) ->
97 log.info("Successfully sent created event for chat chat-room: {}", to);
98 ChatRoomInfo chatRoomInfo = new ChatRoomInfo(chatRoomId, name, shard);
99 sink.success(chatRoomInfo);
105 "Could not send created event for chat-room (id={}, name={}): {}",
109 sink.error(exception);
115 Mono<RecordMetadata> sendShardAssignedEvent(int shard)
117 EventShardAssigned to = EventShardAssigned.of(shard, instanceUri);
119 Sinks.One sink = Sinks.unsafe().one();
121 ProducerRecord<String, AbstractMessageTo> record =
122 new ProducerRecord<>(
124 Integer.toString(shard),
127 producer.send(record, ((metadata, exception) ->
129 if (metadata != null)
131 log.info("Successfully sent shard assigned event for shard: {}", shard);
132 sink.emitValue(metadata, Sinks.EmitFailureHandler.FAIL_FAST);
138 "Could not send shard assigned event for shard {}: {}",
141 sink.emitError(exception, Sinks.EmitFailureHandler.FAIL_FAST);
145 return sink.asMono();
148 Mono<RecordMetadata> sendShardRevokedEvent(int shard)
150 EventShardRevoked to = EventShardRevoked.of(shard, instanceUri);
152 Sinks.One sink = Sinks.unsafe().one();
154 ProducerRecord<String, AbstractMessageTo> record =
155 new ProducerRecord<>(
157 Integer.toString(shard),
160 producer.send(record, ((metadata, exception) ->
162 if (metadata != null)
164 log.info("Successfully sent shard revoked event for shard: {}", shard);
165 sink.emitValue(metadata, Sinks.EmitFailureHandler.FAIL_FAST);
171 "Could not send shard revoked event for shard {}: {}",
174 sink.emitError(exception, Sinks.EmitFailureHandler.FAIL_FAST);
178 return sink.asMono();
188 .endOffsets(consumer.assignment())
191 .forEach(entry -> this.currentOffset[entry.getKey().partition()] = entry.getValue());
194 .forEach(partition -> this.nextOffset[partition] = 0l);
200 ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(Duration.ofMinutes(1));
201 log.debug("Fetched {} messages", records.count());
202 handleMessages(records);
204 catch (WakeupException e)
206 log.info("Received WakeupException, exiting!");
211 log.info("Exiting normally");
214 private void handleMessages(ConsumerRecords<String, AbstractMessageTo> records)
216 for (ConsumerRecord<String, AbstractMessageTo> record : records)
218 switch (record.value().getType())
220 case EVENT_CHATROOM_CREATED:
221 EventChatRoomCreated eventChatRoomCreated =
222 (EventChatRoomCreated) record.value();
223 createChatRoom(eventChatRoomCreated.toChatRoomInfo());
226 case EVENT_SHARD_ASSIGNED:
227 EventShardAssigned eventShardAssigned =
228 (EventShardAssigned) record.value();
230 "Shard {} was assigned to {}",
231 eventShardAssigned.getShard(),
232 eventShardAssigned.getUri());
233 shardOwners[eventShardAssigned.getShard()] = eventShardAssigned.getUri();
236 case EVENT_SHARD_REVOKED:
237 EventShardRevoked eventShardRevoked =
238 (EventShardRevoked) record.value();
240 "Shard {} was revoked from {}",
241 eventShardRevoked.getShard(),
242 eventShardRevoked.getUri());
243 shardOwners[eventShardRevoked.getShard()] = null;
248 "Ignoring message for key={} with offset={}: {}",
254 nextOffset[record.partition()] = record.offset() + 1;
258 private void createChatRoom(ChatRoomInfo chatRoomInfo)
260 UUID chatRoomId = chatRoomInfo.getId();
261 Integer partition = chatRoomInfo.getShard();
263 if (this.chatRoomInfo.containsKey(chatRoomId))
266 "Ignoring existing chat-room for {}: {}",
273 "Adding new chat-room for partition {}: {}",
277 this.chatRoomInfo.put(chatRoomId, chatRoomInfo);
281 Flux<ChatRoomInfo> getChatRoomInfo()
283 return Flux.fromIterable(chatRoomInfo.values());
286 Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
288 return Mono.fromSupplier(() -> chatRoomInfo.get(id));
291 Mono<String[]> getShardOwners()
293 return Mono.just(shardOwners);