1 package de.juplo.kafka.chat.backend.implementation.kafka;
3 import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
4 import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException;
5 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
6 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
7 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardAssigned;
8 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardRevoked;
10 import lombok.extern.slf4j.Slf4j;
11 import org.apache.kafka.clients.consumer.Consumer;
12 import org.apache.kafka.clients.consumer.ConsumerRecord;
13 import org.apache.kafka.clients.consumer.ConsumerRecords;
14 import org.apache.kafka.clients.producer.Producer;
15 import org.apache.kafka.clients.producer.ProducerRecord;
16 import org.apache.kafka.clients.producer.RecordMetadata;
17 import org.apache.kafka.common.errors.WakeupException;
18 import reactor.core.publisher.Flux;
19 import reactor.core.publisher.Mono;
23 import java.util.HashMap;
25 import java.util.UUID;
26 import java.util.stream.IntStream;
30 public class InfoChannel implements Runnable
32 private final String topic;
33 private final Producer<String, AbstractMessageTo> producer;
34 private final Consumer<String, AbstractMessageTo> consumer;
35 private final int numShards;
36 private final String[] shardOwners;
37 private final long[] currentOffset;
38 private final long[] nextOffset;
39 private final Map<UUID, ChatRoomInfo> chatRoomInfo;
40 private final String instanceUri;
42 private boolean running;
44 private volatile boolean loadInProgress = true;
49 Producer<String, AbstractMessageTo> producer,
50 Consumer<String, AbstractMessageTo> infoChannelConsumer,
54 "Creating InfoChannel for topic {}",
57 this.consumer = infoChannelConsumer;
58 this.producer = producer;
59 this.chatRoomInfo = new HashMap<>();
61 this.numShards = consumer
64 this.shardOwners = new String[numShards];
65 this.currentOffset = new long[numShards];
66 this.nextOffset = new long[numShards];
69 .forEach(partition -> this.nextOffset[partition] = -1l);
71 this.instanceUri = instanceUri.toASCIIString();
75 Mono<ChatRoomInfo> sendChatRoomCreatedEvent(
80 EventChatRoomCreated to = EventChatRoomCreated.of(chatRoomId, name, shard);
81 return Mono.create(sink ->
83 ProducerRecord<String, AbstractMessageTo> record =
86 Integer.toString(shard),
89 producer.send(record, ((metadata, exception) ->
91 if (exception == null)
93 log.info("Successfully sent created event for chat chat-room: {}", to);
94 ChatRoomInfo chatRoomInfo = new ChatRoomInfo(chatRoomId, name, shard);
95 sink.success(chatRoomInfo);
101 "Could not send created event for chat-room (id={}, name={}): {}",
105 sink.error(exception);
111 void sendShardAssignedEvent(int shard)
113 EventShardAssigned to = EventShardAssigned.of(shard, instanceUri);
115 ProducerRecord<String, AbstractMessageTo> record =
116 new ProducerRecord<>(
118 Integer.toString(shard),
121 producer.send(record, ((metadata, exception) ->
123 if (metadata != null)
125 log.info("Successfully sent shard assigned event for shard: {}", shard);
131 "Could not send shard assigned event for shard {}: {}",
135 // Verhalten im Fehlerfall durchdenken!
136 // Z.B.: unsubscribe() und darauf folgendes (re-)subscribe() des
137 // Consumers veranlassen, so dass die nicht öffentlich Bekannte
138 // Zuständigkeit abgegeben und neu zugeordnet wird?
139 // Falls der Weg gegangen wird: Achtung wegen Sticke Partitions!
144 void sendShardRevokedEvent(int shard)
146 EventShardRevoked to = EventShardRevoked.of(shard, instanceUri);
148 ProducerRecord<String, AbstractMessageTo> record =
149 new ProducerRecord<>(
151 Integer.toString(shard),
154 producer.send(record, ((metadata, exception) ->
156 if (metadata != null)
158 log.info("Successfully sent shard revoked event for shard: {}", shard);
164 "Could not send shard revoked event for shard {}: {}",
168 // Verhalten im Fehlerfall durchdenken!
169 // Ggf. einfach egal, da die neue zuständige Instanz den
170 // nicht gelöschten Eintrag eh überschreibt?
182 .endOffsets(consumer.assignment())
185 .forEach(entry -> this.currentOffset[entry.getKey().partition()] = entry.getValue());
188 .forEach(partition -> this.nextOffset[partition] = 0l);
189 loadInProgress = true;
195 ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(Duration.ofMinutes(1));
196 log.debug("Fetched {} messages", records.count());
197 for (ConsumerRecord<String, AbstractMessageTo> record : records)
199 handleMessage(record);
200 updateNextOffset(record.partition(), record.offset() + 1);
203 catch (WakeupException e)
205 log.info("Received WakeupException, exiting!");
210 log.info("Exiting normally");
213 private void updateNextOffset(int partition, long nextOffset)
215 this.nextOffset[partition] = nextOffset;
216 if (loadInProgress) {
217 loadInProgress = IntStream
219 .anyMatch(shard -> this.nextOffset[shard] < currentOffset[partition]);
223 private void handleMessage(ConsumerRecord<String, AbstractMessageTo> record)
225 switch (record.value().getType())
227 case EVENT_CHATROOM_CREATED:
228 EventChatRoomCreated eventChatRoomCreated =
229 (EventChatRoomCreated) record.value();
230 createChatRoom(eventChatRoomCreated.toChatRoomInfo());
233 case EVENT_SHARD_ASSIGNED:
234 EventShardAssigned eventShardAssigned =
235 (EventShardAssigned) record.value();
237 "Shard {} was assigned to {}",
238 eventShardAssigned.getShard(),
239 eventShardAssigned.getUri());
240 shardOwners[eventShardAssigned.getShard()] = eventShardAssigned.getUri();
243 case EVENT_SHARD_REVOKED:
244 EventShardRevoked eventShardRevoked =
245 (EventShardRevoked) record.value();
247 "Shard {} was revoked from {}",
248 eventShardRevoked.getShard(),
249 eventShardRevoked.getUri());
250 shardOwners[eventShardRevoked.getShard()] = null;
255 "Ignoring message for key={} with offset={}: {}",
262 private void createChatRoom(ChatRoomInfo chatRoomInfo)
264 UUID chatRoomId = chatRoomInfo.getId();
265 Integer partition = chatRoomInfo.getShard();
267 if (this.chatRoomInfo.containsKey(chatRoomId))
270 "Ignoring existing chat-room for {}: {}",
277 "Adding new chat-room for partition {}: {}",
281 this.chatRoomInfo.put(chatRoomId, chatRoomInfo);
285 Flux<ChatRoomInfo> getChatRoomInfo()
287 return Flux.fromIterable(chatRoomInfo.values());
290 Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
294 return Mono.error(new LoadInProgressException());
297 return Mono.fromSupplier(() -> chatRoomInfo.get(id));
300 Mono<String[]> getShardOwners()
302 return Mono.just(shardOwners);