1 package de.juplo.kafka.chat.backend.implementation.kafka;
3 import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
4 import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException;
5 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
6 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
7 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardAssigned;
8 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardRevoked;
10 import lombok.extern.slf4j.Slf4j;
11 import org.apache.kafka.clients.consumer.Consumer;
12 import org.apache.kafka.clients.consumer.ConsumerRecord;
13 import org.apache.kafka.clients.consumer.ConsumerRecords;
14 import org.apache.kafka.clients.producer.Producer;
15 import org.apache.kafka.clients.producer.ProducerRecord;
16 import org.apache.kafka.clients.producer.RecordMetadata;
17 import org.apache.kafka.common.errors.WakeupException;
18 import reactor.core.publisher.Flux;
19 import reactor.core.publisher.Mono;
23 import java.util.HashMap;
25 import java.util.UUID;
26 import java.util.stream.IntStream;
30 public class InfoChannel implements Runnable
32 private final String topic;
33 private final Producer<String, AbstractMessageTo> producer;
34 private final Consumer<String, AbstractMessageTo> consumer;
35 private final Duration pollingInterval;
36 private final int numShards;
37 private final String[] shardOwners;
38 private final long[] currentOffset;
39 private final long[] nextOffset;
40 private final Map<UUID, ChatRoomInfo> chatRoomInfo;
41 private final String instanceUri;
42 private final ChannelMediator channelMediator;
44 private boolean running;
46 private volatile boolean loadInProgress = true;
51 Producer<String, AbstractMessageTo> producer,
52 Consumer<String, AbstractMessageTo> infoChannelConsumer,
53 Duration pollingInterval,
56 ChannelMediator channelMediator)
59 "Creating InfoChannel for topic {}",
62 this.consumer = infoChannelConsumer;
63 this.producer = producer;
64 this.chatRoomInfo = new HashMap<>();
66 this.pollingInterval = pollingInterval;
68 this.numShards = numShards;
69 this.shardOwners = new String[numShards];
70 this.currentOffset = new long[numShards];
71 this.nextOffset = new long[numShards];
74 .forEach(partition -> this.nextOffset[partition] = -1l);
76 this.instanceUri = instanceUri.toASCIIString();
78 this.channelMediator = channelMediator;
82 Mono<ChatRoomInfo> sendChatRoomCreatedEvent(
87 EventChatRoomCreated to = EventChatRoomCreated.of(chatRoomId, name, shard);
88 return Mono.create(sink ->
90 ProducerRecord<String, AbstractMessageTo> record =
93 Integer.toString(shard),
96 producer.send(record, ((metadata, exception) ->
98 if (exception == null)
100 log.info("Successfully sent created event for chat chat-room: {}", to);
101 ChatRoomInfo chatRoomInfo = new ChatRoomInfo(chatRoomId, name, shard);
102 sink.success(chatRoomInfo);
108 "Could not send created event for chat-room (id={}, name={}): {}",
112 sink.error(exception);
118 void sendShardAssignedEvent(int shard)
120 EventShardAssigned to = EventShardAssigned.of(shard, instanceUri);
122 ProducerRecord<String, AbstractMessageTo> record =
123 new ProducerRecord<>(
125 Integer.toString(shard),
128 producer.send(record, ((metadata, exception) ->
130 if (metadata != null)
132 log.info("Successfully sent shard assigned event for shard: {}", shard);
138 "Could not send shard assigned event for shard {}: {}",
142 // Verhalten im Fehlerfall durchdenken!
143 // Z.B.: unsubscribe() und darauf folgendes (re-)subscribe() des
144 // Consumers veranlassen, so dass die nicht öffentlich Bekannte
145 // Zuständigkeit abgegeben und neu zugeordnet wird?
146 // Falls der Weg gegangen wird: Achtung wegen Sticke Partitions!
151 void sendShardRevokedEvent(int shard)
153 EventShardRevoked to = EventShardRevoked.of(shard, instanceUri);
155 ProducerRecord<String, AbstractMessageTo> record =
156 new ProducerRecord<>(
158 Integer.toString(shard),
161 producer.send(record, ((metadata, exception) ->
163 if (metadata != null)
165 log.info("Successfully sent shard revoked event for shard: {}", shard);
171 "Could not send shard revoked event for shard {}: {}",
175 // Verhalten im Fehlerfall durchdenken!
176 // Ggf. einfach egal, da die neue zuständige Instanz den
177 // nicht gelöschten Eintrag eh überschreibt?
189 .endOffsets(consumer.assignment())
192 .forEach(entry -> this.currentOffset[entry.getKey().partition()] = entry.getValue());
195 .forEach(partition -> this.nextOffset[partition] = 0l);
196 loadInProgress = true;
202 ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(pollingInterval);
203 log.debug("Fetched {} messages", records.count());
204 for (ConsumerRecord<String, AbstractMessageTo> record : records)
206 handleMessage(record);
207 updateNextOffset(record.partition(), record.offset() + 1);
210 catch (WakeupException e)
212 log.info("Received WakeupException, exiting!");
217 log.info("Exiting normally");
220 private void updateNextOffset(int partition, long nextOffset)
222 this.nextOffset[partition] = nextOffset;
223 if (loadInProgress) {
224 loadInProgress = IntStream
226 .anyMatch(shard -> this.nextOffset[shard] < currentOffset[partition]);
230 private void handleMessage(ConsumerRecord<String, AbstractMessageTo> record)
232 switch (record.value().getType())
234 case EVENT_CHATROOM_CREATED:
235 EventChatRoomCreated eventChatRoomCreated =
236 (EventChatRoomCreated) record.value();
237 createChatRoom(eventChatRoomCreated.toChatRoomInfo());
240 case EVENT_SHARD_ASSIGNED:
241 EventShardAssigned eventShardAssigned =
242 (EventShardAssigned) record.value();
244 "Shard {} was assigned to {}",
245 eventShardAssigned.getShard(),
246 eventShardAssigned.getUri());
247 shardOwners[eventShardAssigned.getShard()] = eventShardAssigned.getUri();
250 case EVENT_SHARD_REVOKED:
251 EventShardRevoked eventShardRevoked =
252 (EventShardRevoked) record.value();
254 "Shard {} was revoked from {}",
255 eventShardRevoked.getShard(),
256 eventShardRevoked.getUri());
257 shardOwners[eventShardRevoked.getShard()] = null;
262 "Ignoring message for key={} with offset={}: {}",
269 private void createChatRoom(ChatRoomInfo chatRoomInfo)
271 UUID chatRoomId = chatRoomInfo.getId();
272 Integer partition = chatRoomInfo.getShard();
274 if (this.chatRoomInfo.containsKey(chatRoomId))
277 "Ignoring existing chat-room for {}: {}",
284 "Adding new chat-room for partition {}: {}",
288 this.chatRoomInfo.put(chatRoomId, chatRoomInfo);
289 this.channelMediator.chatRoomCreated(chatRoomInfo);
293 Flux<ChatRoomInfo> getChatRoomInfo()
295 return Flux.fromIterable(chatRoomInfo.values());
298 Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
302 return Mono.error(new LoadInProgressException());
305 return Mono.fromSupplier(() -> chatRoomInfo.get(id));
308 Mono<String[]> getShardOwners()
310 return Mono.just(shardOwners);