1 package de.juplo.kafka.chat.backend.implementation.kafka;
3 import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
4 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
5 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
6 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardAssigned;
7 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardRevoked;
8 import lombok.extern.slf4j.Slf4j;
9 import org.apache.kafka.clients.consumer.Consumer;
10 import org.apache.kafka.clients.consumer.ConsumerRecord;
11 import org.apache.kafka.clients.consumer.ConsumerRecords;
12 import org.apache.kafka.clients.producer.Producer;
13 import org.apache.kafka.clients.producer.ProducerRecord;
14 import org.apache.kafka.clients.producer.RecordMetadata;
15 import org.apache.kafka.common.errors.WakeupException;
16 import reactor.core.publisher.Flux;
17 import reactor.core.publisher.Mono;
21 import java.util.HashMap;
23 import java.util.UUID;
24 import java.util.stream.IntStream;
28 public class InfoChannel implements Runnable
30 private final String topic;
31 private final Producer<String, AbstractMessageTo> producer;
32 private final Consumer<String, AbstractMessageTo> consumer;
33 private final int numShards;
34 private final String[] shardOwners;
35 private final long[] currentOffset;
36 private final long[] nextOffset;
37 private final Map<UUID, ChatRoomInfo> chatRoomInfo;
38 private final String instanceUri;
40 private boolean running;
45 Producer<String, AbstractMessageTo> producer,
46 Consumer<String, AbstractMessageTo> infoChannelConsumer,
50 "Creating InfoChannel for topic {}",
53 this.consumer = infoChannelConsumer;
54 this.producer = producer;
55 this.chatRoomInfo = new HashMap<>();
57 this.numShards = consumer
60 this.shardOwners = new String[numShards];
61 this.currentOffset = new long[numShards];
62 this.nextOffset = new long[numShards];
65 .forEach(partition -> this.nextOffset[partition] = -1l);
67 this.instanceUri = instanceUri.toASCIIString();
71 boolean isLoadInProgress()
75 .anyMatch(partition -> nextOffset[partition] < currentOffset[partition]);
78 Mono<ChatRoomInfo> sendChatRoomCreatedEvent(
83 EventChatRoomCreated to = EventChatRoomCreated.of(chatRoomId, name, shard);
84 return Mono.create(sink ->
86 ProducerRecord<String, AbstractMessageTo> record =
89 Integer.toString(shard),
92 producer.send(record, ((metadata, exception) ->
94 if (exception == null)
96 log.info("Successfully sent created event for chat chat-room: {}", to);
97 ChatRoomInfo chatRoomInfo = new ChatRoomInfo(chatRoomId, name, shard);
98 sink.success(chatRoomInfo);
104 "Could not send created event for chat-room (id={}, name={}): {}",
108 sink.error(exception);
114 void sendShardAssignedEvent(int shard)
116 EventShardAssigned to = EventShardAssigned.of(shard, instanceUri);
118 ProducerRecord<String, AbstractMessageTo> record =
119 new ProducerRecord<>(
121 Integer.toString(shard),
124 producer.send(record, ((metadata, exception) ->
126 if (metadata != null)
128 log.info("Successfully sent shard assigned event for shard: {}", shard);
134 "Could not send shard assigned event for shard {}: {}",
138 // Verhalten im Fehlerfall durchdenken!
139 // Z.B.: unsubscribe() und darauf folgendes (re-)subscribe() des
140 // Consumers veranlassen, so dass die nicht öffentlich Bekannte
141 // Zuständigkeit abgegeben und neu zugeordnet wird?
142 // Falls der Weg gegangen wird: Achtung wegen Sticke Partitions!
147 void sendShardRevokedEvent(int shard)
149 EventShardRevoked to = EventShardRevoked.of(shard, instanceUri);
151 ProducerRecord<String, AbstractMessageTo> record =
152 new ProducerRecord<>(
154 Integer.toString(shard),
157 producer.send(record, ((metadata, exception) ->
159 if (metadata != null)
161 log.info("Successfully sent shard revoked event for shard: {}", shard);
167 "Could not send shard revoked event for shard {}: {}",
171 // Verhalten im Fehlerfall durchdenken!
172 // Ggf. einfach egal, da die neue zuständige Instanz den
173 // nicht gelöschten Eintrag eh überschreibt?
185 .endOffsets(consumer.assignment())
188 .forEach(entry -> this.currentOffset[entry.getKey().partition()] = entry.getValue());
191 .forEach(partition -> this.nextOffset[partition] = 0l);
197 ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(Duration.ofMinutes(1));
198 log.debug("Fetched {} messages", records.count());
199 for (ConsumerRecord<String, AbstractMessageTo> record : records)
201 handleMessage(record);
202 updateNextOffset(record.partition(), record.offset() + 1);
205 catch (WakeupException e)
207 log.info("Received WakeupException, exiting!");
212 log.info("Exiting normally");
215 private void updateNextOffset(int partition, long nextOffset)
217 this.nextOffset[partition] = nextOffset;
220 private void handleMessage(ConsumerRecord<String, AbstractMessageTo> record)
222 switch (record.value().getType())
224 case EVENT_CHATROOM_CREATED:
225 EventChatRoomCreated eventChatRoomCreated =
226 (EventChatRoomCreated) record.value();
227 createChatRoom(eventChatRoomCreated.toChatRoomInfo());
230 case EVENT_SHARD_ASSIGNED:
231 EventShardAssigned eventShardAssigned =
232 (EventShardAssigned) record.value();
234 "Shard {} was assigned to {}",
235 eventShardAssigned.getShard(),
236 eventShardAssigned.getUri());
237 shardOwners[eventShardAssigned.getShard()] = eventShardAssigned.getUri();
240 case EVENT_SHARD_REVOKED:
241 EventShardRevoked eventShardRevoked =
242 (EventShardRevoked) record.value();
244 "Shard {} was revoked from {}",
245 eventShardRevoked.getShard(),
246 eventShardRevoked.getUri());
247 shardOwners[eventShardRevoked.getShard()] = null;
252 "Ignoring message for key={} with offset={}: {}",
259 private void createChatRoom(ChatRoomInfo chatRoomInfo)
261 UUID chatRoomId = chatRoomInfo.getId();
262 Integer partition = chatRoomInfo.getShard();
264 if (this.chatRoomInfo.containsKey(chatRoomId))
267 "Ignoring existing chat-room for {}: {}",
274 "Adding new chat-room for partition {}: {}",
278 this.chatRoomInfo.put(chatRoomId, chatRoomInfo);
282 Flux<ChatRoomInfo> getChatRoomInfo()
284 return Flux.fromIterable(chatRoomInfo.values());
287 Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
289 return Mono.fromSupplier(() -> chatRoomInfo.get(id));
292 Mono<String[]> getShardOwners()
294 return Mono.just(shardOwners);