1 package de.juplo.kafka.chat.backend.implementation.kafka;
3 import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
4 import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException;
5 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
6 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
7 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardAssigned;
8 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardRevoked;
10 import lombok.extern.slf4j.Slf4j;
11 import org.apache.kafka.clients.consumer.Consumer;
12 import org.apache.kafka.clients.consumer.ConsumerRecord;
13 import org.apache.kafka.clients.consumer.ConsumerRecords;
14 import org.apache.kafka.clients.producer.Producer;
15 import org.apache.kafka.clients.producer.ProducerRecord;
16 import org.apache.kafka.clients.producer.RecordMetadata;
17 import org.apache.kafka.common.errors.WakeupException;
18 import reactor.core.publisher.Flux;
19 import reactor.core.publisher.Mono;
23 import java.util.HashMap;
25 import java.util.UUID;
26 import java.util.stream.IntStream;
30 public class InfoChannel implements Runnable
32 private final String topic;
33 private final Producer<String, AbstractMessageTo> producer;
34 private final Consumer<String, AbstractMessageTo> consumer;
35 private final Duration pollingInterval;
36 private final int numShards;
37 private final String[] shardOwners;
38 private final long[] currentOffset;
39 private final long[] nextOffset;
40 private final Map<UUID, ChatRoomInfo> chatRoomInfo;
41 private final String instanceUri;
43 private boolean running;
45 private volatile boolean loadInProgress = true;
50 Producer<String, AbstractMessageTo> producer,
51 Consumer<String, AbstractMessageTo> infoChannelConsumer,
52 Duration pollingInterval,
57 "Creating InfoChannel for topic {}",
60 this.consumer = infoChannelConsumer;
61 this.producer = producer;
62 this.chatRoomInfo = new HashMap<>();
64 this.pollingInterval = pollingInterval;
66 this.numShards = numShards;
67 this.shardOwners = new String[numShards];
68 this.currentOffset = new long[numShards];
69 this.nextOffset = new long[numShards];
72 .forEach(partition -> this.nextOffset[partition] = -1l);
74 this.instanceUri = instanceUri.toASCIIString();
78 Mono<ChatRoomInfo> sendChatRoomCreatedEvent(
83 EventChatRoomCreated to = EventChatRoomCreated.of(chatRoomId, name, shard);
84 return Mono.create(sink ->
86 ProducerRecord<String, AbstractMessageTo> record =
89 Integer.toString(shard),
92 producer.send(record, ((metadata, exception) ->
94 if (exception == null)
96 log.info("Successfully sent created event for chat chat-room: {}", to);
97 ChatRoomInfo chatRoomInfo = new ChatRoomInfo(chatRoomId, name, shard);
98 sink.success(chatRoomInfo);
104 "Could not send created event for chat-room (id={}, name={}): {}",
108 sink.error(exception);
114 void sendShardAssignedEvent(int shard)
116 EventShardAssigned to = EventShardAssigned.of(shard, instanceUri);
118 ProducerRecord<String, AbstractMessageTo> record =
119 new ProducerRecord<>(
121 Integer.toString(shard),
124 producer.send(record, ((metadata, exception) ->
126 if (metadata != null)
128 log.info("Successfully sent shard assigned event for shard: {}", shard);
134 "Could not send shard assigned event for shard {}: {}",
138 // Verhalten im Fehlerfall durchdenken!
139 // Z.B.: unsubscribe() und darauf folgendes (re-)subscribe() des
140 // Consumers veranlassen, so dass die nicht öffentlich Bekannte
141 // Zuständigkeit abgegeben und neu zugeordnet wird?
142 // Falls der Weg gegangen wird: Achtung wegen Sticke Partitions!
147 void sendShardRevokedEvent(int shard)
149 EventShardRevoked to = EventShardRevoked.of(shard, instanceUri);
151 ProducerRecord<String, AbstractMessageTo> record =
152 new ProducerRecord<>(
154 Integer.toString(shard),
157 producer.send(record, ((metadata, exception) ->
159 if (metadata != null)
161 log.info("Successfully sent shard revoked event for shard: {}", shard);
167 "Could not send shard revoked event for shard {}: {}",
171 // Verhalten im Fehlerfall durchdenken!
172 // Ggf. einfach egal, da die neue zuständige Instanz den
173 // nicht gelöschten Eintrag eh überschreibt?
185 .endOffsets(consumer.assignment())
188 .forEach(entry -> this.currentOffset[entry.getKey().partition()] = entry.getValue());
191 .forEach(partition -> this.nextOffset[partition] = 0l);
192 loadInProgress = true;
198 ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(pollingInterval);
199 log.debug("Fetched {} messages", records.count());
200 for (ConsumerRecord<String, AbstractMessageTo> record : records)
202 handleMessage(record);
203 updateNextOffset(record.partition(), record.offset() + 1);
206 catch (WakeupException e)
208 log.info("Received WakeupException, exiting!");
213 log.info("Exiting normally");
216 private void updateNextOffset(int partition, long nextOffset)
218 this.nextOffset[partition] = nextOffset;
219 if (loadInProgress) {
220 loadInProgress = IntStream
222 .anyMatch(shard -> this.nextOffset[shard] < currentOffset[partition]);
226 private void handleMessage(ConsumerRecord<String, AbstractMessageTo> record)
228 switch (record.value().getType())
230 case EVENT_CHATROOM_CREATED:
231 EventChatRoomCreated eventChatRoomCreated =
232 (EventChatRoomCreated) record.value();
233 createChatRoom(eventChatRoomCreated.toChatRoomInfo());
236 case EVENT_SHARD_ASSIGNED:
237 EventShardAssigned eventShardAssigned =
238 (EventShardAssigned) record.value();
240 "Shard {} was assigned to {}",
241 eventShardAssigned.getShard(),
242 eventShardAssigned.getUri());
243 shardOwners[eventShardAssigned.getShard()] = eventShardAssigned.getUri();
246 case EVENT_SHARD_REVOKED:
247 EventShardRevoked eventShardRevoked =
248 (EventShardRevoked) record.value();
250 "Shard {} was revoked from {}",
251 eventShardRevoked.getShard(),
252 eventShardRevoked.getUri());
253 shardOwners[eventShardRevoked.getShard()] = null;
258 "Ignoring message for key={} with offset={}: {}",
265 private void createChatRoom(ChatRoomInfo chatRoomInfo)
267 UUID chatRoomId = chatRoomInfo.getId();
268 Integer partition = chatRoomInfo.getShard();
270 if (this.chatRoomInfo.containsKey(chatRoomId))
273 "Ignoring existing chat-room for {}: {}",
280 "Adding new chat-room for partition {}: {}",
284 this.chatRoomInfo.put(chatRoomId, chatRoomInfo);
288 Flux<ChatRoomInfo> getChatRoomInfo()
290 return Flux.fromIterable(chatRoomInfo.values());
293 Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
297 return Mono.error(new LoadInProgressException());
300 return Mono.fromSupplier(() -> chatRoomInfo.get(id));
303 Mono<String[]> getShardOwners()
305 return Mono.just(shardOwners);