1 package de.juplo.kafka.chat.backend.implementation.kafka;
3 import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
4 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
5 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
6 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardAssigned;
7 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardRevoked;
9 import lombok.ToString;
10 import lombok.extern.slf4j.Slf4j;
11 import org.apache.kafka.clients.consumer.Consumer;
12 import org.apache.kafka.clients.consumer.ConsumerRecord;
13 import org.apache.kafka.clients.consumer.ConsumerRecords;
14 import org.apache.kafka.clients.producer.Producer;
15 import org.apache.kafka.clients.producer.ProducerRecord;
16 import org.apache.kafka.common.errors.WakeupException;
17 import reactor.core.publisher.Flux;
18 import reactor.core.publisher.Mono;
21 import java.time.Duration;
22 import java.util.HashMap;
24 import java.util.UUID;
25 import java.util.stream.IntStream;
28 @ToString(of = { "topic", "instanceUri" })
30 public class InfoChannel implements Channel
32 private final String topic;
33 private final Producer<String, AbstractMessageTo> producer;
34 private final Consumer<String, AbstractMessageTo> consumer;
35 private final Duration pollingInterval;
36 private final int numShards;
37 private final String[] shardOwners;
38 private final long[] currentOffset;
39 private final long[] nextOffset;
40 private final Map<UUID, ChatRoomInfo> chatRoomInfo;
41 private final String instanceUri;
42 private final ChannelMediator channelMediator;
44 private boolean running;
46 private volatile ChannelState channelState = ChannelState.STARTING;
51 Producer<String, AbstractMessageTo> producer,
52 Consumer<String, AbstractMessageTo> infoChannelConsumer,
53 Duration pollingInterval,
56 ChannelMediator channelMediator)
59 "Creating InfoChannel for topic {}",
62 this.consumer = infoChannelConsumer;
63 this.producer = producer;
64 this.chatRoomInfo = new HashMap<>();
66 this.pollingInterval = pollingInterval;
68 this.numShards = numShards;
69 this.shardOwners = new String[numShards];
70 this.currentOffset = new long[numShards];
71 this.nextOffset = new long[numShards];
74 .forEach(partition -> this.nextOffset[partition] = -1l);
76 this.instanceUri = instanceUri.toASCIIString();
78 this.channelMediator = channelMediator;
82 Mono<ChatRoomInfo> sendChatRoomCreatedEvent(
87 EventChatRoomCreated to = EventChatRoomCreated.of(chatRoomId, name, shard);
88 return Mono.create(sink ->
90 ProducerRecord<String, AbstractMessageTo> record =
93 Integer.toString(shard),
96 producer.send(record, ((metadata, exception) ->
98 if (exception == null)
100 log.info("Successfully sent created event for chat chat-room: {}", to);
101 ChatRoomInfo chatRoomInfo = new ChatRoomInfo(chatRoomId, name, shard);
102 sink.success(chatRoomInfo);
108 "Could not send created event for chat-room (id={}, name={}): {}",
112 sink.error(exception);
118 void sendShardAssignedEvent(int shard)
120 EventShardAssigned to = EventShardAssigned.of(shard, instanceUri);
122 ProducerRecord<String, AbstractMessageTo> record =
123 new ProducerRecord<>(
125 Integer.toString(shard),
128 producer.send(record, ((metadata, exception) ->
130 if (metadata != null)
132 log.info("Successfully sent shard assigned event for shard: {}", shard);
138 "Could not send shard assigned event for shard {}: {}",
142 // Verhalten im Fehlerfall durchdenken!
143 // Z.B.: unsubscribe() und darauf folgendes (re-)subscribe() des
144 // Consumers veranlassen, so dass die nicht öffentlich Bekannte
145 // Zuständigkeit abgegeben und neu zugeordnet wird?
146 // Falls der Weg gegangen wird: Achtung wegen Sticke Partitions!
151 void sendShardRevokedEvent(int shard)
153 EventShardRevoked to = EventShardRevoked.of(shard, instanceUri);
155 ProducerRecord<String, AbstractMessageTo> record =
156 new ProducerRecord<>(
158 Integer.toString(shard),
161 producer.send(record, ((metadata, exception) ->
163 if (metadata != null)
165 log.info("Successfully sent shard revoked event for shard: {}", shard);
171 "Could not send shard revoked event for shard {}: {}",
175 // Verhalten im Fehlerfall durchdenken!
176 // Ggf. einfach egal, da die neue zuständige Instanz den
177 // nicht gelöschten Eintrag eh überschreibt?
189 .endOffsets(consumer.assignment())
192 .forEach(entry -> this.currentOffset[entry.getKey().partition()] = entry.getValue());
195 .forEach(partition -> this.nextOffset[partition] = 0l);
196 channelState = ChannelState.LOAD_IN_PROGRESS;
202 ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(pollingInterval);
203 log.debug("Fetched {} messages", records.count());
204 for (ConsumerRecord<String, AbstractMessageTo> record : records)
206 handleMessage(record);
207 updateNextOffset(record.partition(), record.offset() + 1);
210 catch (WakeupException e)
212 log.info("Received WakeupException, exiting!");
213 channelState = ChannelState.SHUTTING_DOWN;
218 log.info("Exiting normally");
221 private void updateNextOffset(int partition, long nextOffset)
223 this.nextOffset[partition] = nextOffset;
224 if (channelState == ChannelState.LOAD_IN_PROGRESS)
226 boolean loadInProgress = IntStream
228 .anyMatch(shard -> this.nextOffset[shard] < currentOffset[shard]);
231 log.info("Loading of info completed! Resuming normal operations...");
232 channelState = ChannelState.READY;
237 private void handleMessage(ConsumerRecord<String, AbstractMessageTo> record)
239 switch (record.value().getType())
241 case EVENT_CHATROOM_CREATED:
242 EventChatRoomCreated eventChatRoomCreated =
243 (EventChatRoomCreated) record.value();
244 createChatRoom(eventChatRoomCreated.toChatRoomInfo());
247 case EVENT_SHARD_ASSIGNED:
248 EventShardAssigned eventShardAssigned =
249 (EventShardAssigned) record.value();
251 "Shard {} was assigned to {}",
252 eventShardAssigned.getShard(),
253 eventShardAssigned.getUri());
254 shardOwners[eventShardAssigned.getShard()] = eventShardAssigned.getUri();
257 case EVENT_SHARD_REVOKED:
258 EventShardRevoked eventShardRevoked =
259 (EventShardRevoked) record.value();
261 "Shard {} was revoked from {}",
262 eventShardRevoked.getShard(),
263 eventShardRevoked.getUri());
264 shardOwners[eventShardRevoked.getShard()] = null;
269 "Ignoring message for key={} with offset={}: {}",
276 private void createChatRoom(ChatRoomInfo chatRoomInfo)
278 UUID chatRoomId = chatRoomInfo.getId();
279 Integer partition = chatRoomInfo.getShard();
281 if (this.chatRoomInfo.containsKey(chatRoomId))
284 "Ignoring existing chat-room for {}: {}",
291 "Adding new chat-room for partition {}: {}",
295 this.chatRoomInfo.put(chatRoomId, chatRoomInfo);
296 this.channelMediator.chatRoomCreated(chatRoomInfo);
300 Flux<ChatRoomInfo> getChatRoomInfo()
302 return Flux.fromIterable(chatRoomInfo.values());
305 Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
307 ChannelState capturedState = channelState;
308 if (capturedState != ChannelState.READY)
310 return Mono.error(new ChannelNotReadyException(capturedState));
313 return Mono.fromSupplier(() -> chatRoomInfo.get(id));
316 Mono<String[]> getShardOwners()
318 return Mono.just(shardOwners);