1 package de.juplo.kafka.chat.backend.implementation.kafka;
3 import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
4 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
5 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
6 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardAssigned;
7 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardRevoked;
8 import lombok.extern.slf4j.Slf4j;
9 import org.apache.kafka.clients.consumer.Consumer;
10 import org.apache.kafka.clients.consumer.ConsumerRecord;
11 import org.apache.kafka.clients.consumer.ConsumerRecords;
12 import org.apache.kafka.clients.producer.Producer;
13 import org.apache.kafka.clients.producer.ProducerRecord;
14 import org.apache.kafka.clients.producer.RecordMetadata;
15 import org.apache.kafka.common.errors.WakeupException;
16 import reactor.core.publisher.Flux;
17 import reactor.core.publisher.Mono;
21 import java.util.HashMap;
23 import java.util.UUID;
24 import java.util.stream.IntStream;
28 public class InfoChannel implements Runnable
30 private final String topic;
31 private final Producer<String, AbstractMessageTo> producer;
32 private final Consumer<String, AbstractMessageTo> consumer;
33 private final int numShards;
34 private final String[] shardOwners;
35 private final long[] currentOffset;
36 private final long[] nextOffset;
37 private final Map<UUID, ChatRoomInfo> chatRoomInfo;
38 private final String instanceUri;
40 private boolean running;
45 Producer<String, AbstractMessageTo> producer,
46 Consumer<String, AbstractMessageTo> infoChannelConsumer,
50 "Creating InfoChannel for topic {}",
53 this.consumer = infoChannelConsumer;
54 this.producer = producer;
55 this.chatRoomInfo = new HashMap<>();
57 this.numShards = consumer
60 this.shardOwners = new String[numShards];
61 this.currentOffset = new long[numShards];
62 this.nextOffset = new long[numShards];
65 .forEach(partition -> this.nextOffset[partition] = -1l);
67 this.instanceUri = instanceUri.toASCIIString();
71 boolean isLoadInProgress()
75 .anyMatch(partition -> nextOffset[partition] < currentOffset[partition]);
78 Mono<ChatRoomInfo> sendChatRoomCreatedEvent(
83 EventChatRoomCreated to = EventChatRoomCreated.of(chatRoomId, name, shard);
84 return Mono.create(sink ->
86 ProducerRecord<String, AbstractMessageTo> record =
89 Integer.toString(shard),
92 producer.send(record, ((metadata, exception) ->
94 if (exception == null)
96 log.info("Successfully sent created event for chat chat-room: {}", to);
97 ChatRoomInfo chatRoomInfo = new ChatRoomInfo(chatRoomId, name, shard);
98 sink.success(chatRoomInfo);
104 "Could not send created event for chat-room (id={}, name={}): {}",
108 sink.error(exception);
114 void sendShardAssignedEvent(int shard)
116 EventShardAssigned to = EventShardAssigned.of(shard, instanceUri);
118 ProducerRecord<String, AbstractMessageTo> record =
119 new ProducerRecord<>(
121 Integer.toString(shard),
124 producer.send(record, ((metadata, exception) ->
126 if (metadata != null)
128 log.info("Successfully sent shard assigned event for shard: {}", shard);
134 "Could not send shard assigned event for shard {}: {}",
138 // Verhalten im Fehlerfall durchdenken!
139 // Z.B.: unsubscribe() und darauf folgendes (re-)subscribe() des
140 // Consumers veranlassen, so dass die nicht öffentlich Bekannte
141 // Zuständigkeit abgegeben und neu zugeordnet wird?
142 // Falls der Weg gegangen wird: Achtung wegen Sticke Partitions!
147 void sendShardRevokedEvent(int shard)
149 EventShardRevoked to = EventShardRevoked.of(shard, instanceUri);
151 ProducerRecord<String, AbstractMessageTo> record =
152 new ProducerRecord<>(
154 Integer.toString(shard),
157 producer.send(record, ((metadata, exception) ->
159 if (metadata != null)
161 log.info("Successfully sent shard revoked event for shard: {}", shard);
167 "Could not send shard revoked event for shard {}: {}",
171 // Verhalten im Fehlerfall durchdenken!
172 // Ggf. einfach egal, da die neue zuständige Instanz den
173 // nicht gelöschten Eintrag eh überschreibt?
185 .endOffsets(consumer.assignment())
188 .forEach(entry -> this.currentOffset[entry.getKey().partition()] = entry.getValue());
191 .forEach(partition -> this.nextOffset[partition] = 0l);
197 ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(Duration.ofMinutes(1));
198 log.debug("Fetched {} messages", records.count());
199 handleMessages(records);
201 catch (WakeupException e)
203 log.info("Received WakeupException, exiting!");
208 log.info("Exiting normally");
211 private void handleMessages(ConsumerRecords<String, AbstractMessageTo> records)
213 for (ConsumerRecord<String, AbstractMessageTo> record : records)
215 switch (record.value().getType())
217 case EVENT_CHATROOM_CREATED:
218 EventChatRoomCreated eventChatRoomCreated =
219 (EventChatRoomCreated) record.value();
220 createChatRoom(eventChatRoomCreated.toChatRoomInfo());
223 case EVENT_SHARD_ASSIGNED:
224 EventShardAssigned eventShardAssigned =
225 (EventShardAssigned) record.value();
227 "Shard {} was assigned to {}",
228 eventShardAssigned.getShard(),
229 eventShardAssigned.getUri());
230 shardOwners[eventShardAssigned.getShard()] = eventShardAssigned.getUri();
233 case EVENT_SHARD_REVOKED:
234 EventShardRevoked eventShardRevoked =
235 (EventShardRevoked) record.value();
237 "Shard {} was revoked from {}",
238 eventShardRevoked.getShard(),
239 eventShardRevoked.getUri());
240 shardOwners[eventShardRevoked.getShard()] = null;
245 "Ignoring message for key={} with offset={}: {}",
251 nextOffset[record.partition()] = record.offset() + 1;
255 private void createChatRoom(ChatRoomInfo chatRoomInfo)
257 UUID chatRoomId = chatRoomInfo.getId();
258 Integer partition = chatRoomInfo.getShard();
260 if (this.chatRoomInfo.containsKey(chatRoomId))
263 "Ignoring existing chat-room for {}: {}",
270 "Adding new chat-room for partition {}: {}",
274 this.chatRoomInfo.put(chatRoomId, chatRoomInfo);
278 Flux<ChatRoomInfo> getChatRoomInfo()
280 return Flux.fromIterable(chatRoomInfo.values());
283 Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
285 return Mono.fromSupplier(() -> chatRoomInfo.get(id));
288 Mono<String[]> getShardOwners()
290 return Mono.just(shardOwners);