1 package de.juplo.kafka.chat.backend.implementation.kafka;
3 import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
4 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
5 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
6 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardAssigned;
7 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardRevoked;
9 import lombok.extern.slf4j.Slf4j;
10 import org.apache.kafka.clients.consumer.Consumer;
11 import org.apache.kafka.clients.consumer.ConsumerRecord;
12 import org.apache.kafka.clients.consumer.ConsumerRecords;
13 import org.apache.kafka.clients.producer.Producer;
14 import org.apache.kafka.clients.producer.ProducerRecord;
15 import org.apache.kafka.clients.producer.RecordMetadata;
16 import org.apache.kafka.common.errors.WakeupException;
17 import reactor.core.publisher.Flux;
18 import reactor.core.publisher.Mono;
22 import java.util.HashMap;
24 import java.util.UUID;
25 import java.util.stream.IntStream;
29 public class InfoChannel implements Runnable
31 private final String topic;
32 private final Producer<String, AbstractMessageTo> producer;
33 private final Consumer<String, AbstractMessageTo> consumer;
34 private final int numShards;
35 private final String[] shardOwners;
36 private final long[] currentOffset;
37 private final long[] nextOffset;
38 private final Map<UUID, ChatRoomInfo> chatRoomInfo;
39 private final String instanceUri;
41 private boolean running;
43 private volatile boolean loadInProgress = true;
48 Producer<String, AbstractMessageTo> producer,
49 Consumer<String, AbstractMessageTo> infoChannelConsumer,
53 "Creating InfoChannel for topic {}",
56 this.consumer = infoChannelConsumer;
57 this.producer = producer;
58 this.chatRoomInfo = new HashMap<>();
60 this.numShards = consumer
63 this.shardOwners = new String[numShards];
64 this.currentOffset = new long[numShards];
65 this.nextOffset = new long[numShards];
68 .forEach(partition -> this.nextOffset[partition] = -1l);
70 this.instanceUri = instanceUri.toASCIIString();
74 Mono<ChatRoomInfo> sendChatRoomCreatedEvent(
79 EventChatRoomCreated to = EventChatRoomCreated.of(chatRoomId, name, shard);
80 return Mono.create(sink ->
82 ProducerRecord<String, AbstractMessageTo> record =
85 Integer.toString(shard),
88 producer.send(record, ((metadata, exception) ->
90 if (exception == null)
92 log.info("Successfully sent created event for chat chat-room: {}", to);
93 ChatRoomInfo chatRoomInfo = new ChatRoomInfo(chatRoomId, name, shard);
94 sink.success(chatRoomInfo);
100 "Could not send created event for chat-room (id={}, name={}): {}",
104 sink.error(exception);
110 void sendShardAssignedEvent(int shard)
112 EventShardAssigned to = EventShardAssigned.of(shard, instanceUri);
114 ProducerRecord<String, AbstractMessageTo> record =
115 new ProducerRecord<>(
117 Integer.toString(shard),
120 producer.send(record, ((metadata, exception) ->
122 if (metadata != null)
124 log.info("Successfully sent shard assigned event for shard: {}", shard);
130 "Could not send shard assigned event for shard {}: {}",
134 // Verhalten im Fehlerfall durchdenken!
135 // Z.B.: unsubscribe() und darauf folgendes (re-)subscribe() des
136 // Consumers veranlassen, so dass die nicht öffentlich Bekannte
137 // Zuständigkeit abgegeben und neu zugeordnet wird?
138 // Falls der Weg gegangen wird: Achtung wegen Sticke Partitions!
143 void sendShardRevokedEvent(int shard)
145 EventShardRevoked to = EventShardRevoked.of(shard, instanceUri);
147 ProducerRecord<String, AbstractMessageTo> record =
148 new ProducerRecord<>(
150 Integer.toString(shard),
153 producer.send(record, ((metadata, exception) ->
155 if (metadata != null)
157 log.info("Successfully sent shard revoked event for shard: {}", shard);
163 "Could not send shard revoked event for shard {}: {}",
167 // Verhalten im Fehlerfall durchdenken!
168 // Ggf. einfach egal, da die neue zuständige Instanz den
169 // nicht gelöschten Eintrag eh überschreibt?
181 .endOffsets(consumer.assignment())
184 .forEach(entry -> this.currentOffset[entry.getKey().partition()] = entry.getValue());
187 .forEach(partition -> this.nextOffset[partition] = 0l);
188 loadInProgress = true;
194 ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(Duration.ofMinutes(1));
195 log.debug("Fetched {} messages", records.count());
196 for (ConsumerRecord<String, AbstractMessageTo> record : records)
198 handleMessage(record);
199 updateNextOffset(record.partition(), record.offset() + 1);
202 catch (WakeupException e)
204 log.info("Received WakeupException, exiting!");
209 log.info("Exiting normally");
212 private void updateNextOffset(int partition, long nextOffset)
214 this.nextOffset[partition] = nextOffset;
215 if (loadInProgress) {
216 loadInProgress = IntStream
218 .anyMatch(shard -> this.nextOffset[shard] < currentOffset[partition]);
222 private void handleMessage(ConsumerRecord<String, AbstractMessageTo> record)
224 switch (record.value().getType())
226 case EVENT_CHATROOM_CREATED:
227 EventChatRoomCreated eventChatRoomCreated =
228 (EventChatRoomCreated) record.value();
229 createChatRoom(eventChatRoomCreated.toChatRoomInfo());
232 case EVENT_SHARD_ASSIGNED:
233 EventShardAssigned eventShardAssigned =
234 (EventShardAssigned) record.value();
236 "Shard {} was assigned to {}",
237 eventShardAssigned.getShard(),
238 eventShardAssigned.getUri());
239 shardOwners[eventShardAssigned.getShard()] = eventShardAssigned.getUri();
242 case EVENT_SHARD_REVOKED:
243 EventShardRevoked eventShardRevoked =
244 (EventShardRevoked) record.value();
246 "Shard {} was revoked from {}",
247 eventShardRevoked.getShard(),
248 eventShardRevoked.getUri());
249 shardOwners[eventShardRevoked.getShard()] = null;
254 "Ignoring message for key={} with offset={}: {}",
261 private void createChatRoom(ChatRoomInfo chatRoomInfo)
263 UUID chatRoomId = chatRoomInfo.getId();
264 Integer partition = chatRoomInfo.getShard();
266 if (this.chatRoomInfo.containsKey(chatRoomId))
269 "Ignoring existing chat-room for {}: {}",
276 "Adding new chat-room for partition {}: {}",
280 this.chatRoomInfo.put(chatRoomId, chatRoomInfo);
284 Flux<ChatRoomInfo> getChatRoomInfo()
286 return Flux.fromIterable(chatRoomInfo.values());
289 Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
291 return Mono.fromSupplier(() -> chatRoomInfo.get(id));
294 Mono<String[]> getShardOwners()
296 return Mono.just(shardOwners);