1 package de.juplo.kafka.chat.backend.implementation.kafka;
3 import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
4 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
5 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
6 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardAssigned;
7 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardRevoked;
8 import lombok.extern.slf4j.Slf4j;
9 import org.apache.kafka.clients.consumer.Consumer;
10 import org.apache.kafka.clients.consumer.ConsumerRecord;
11 import org.apache.kafka.clients.consumer.ConsumerRecords;
12 import org.apache.kafka.clients.producer.Producer;
13 import org.apache.kafka.clients.producer.ProducerRecord;
14 import org.apache.kafka.clients.producer.RecordMetadata;
15 import org.apache.kafka.common.errors.WakeupException;
16 import reactor.core.publisher.Flux;
17 import reactor.core.publisher.Mono;
21 import java.util.HashMap;
23 import java.util.UUID;
24 import java.util.stream.IntStream;
28 public class InfoChannel implements Runnable
30 private final String topic;
31 private final Producer<String, AbstractMessageTo> producer;
32 private final Consumer<String, AbstractMessageTo> consumer;
33 private final int numShards;
34 private final String[] shardOwners;
35 private final long[] currentOffset;
36 private final long[] nextOffset;
37 private final Map<UUID, ChatRoomInfo> chatRoomInfo;
38 private final String instanceUri;
40 private boolean running;
45 Producer<String, AbstractMessageTo> producer,
46 Consumer<String, AbstractMessageTo> infoChannelConsumer,
50 "Creating InfoChannel for topic {}",
53 this.consumer = infoChannelConsumer;
54 this.producer = producer;
55 this.chatRoomInfo = new HashMap<>();
57 this.numShards = consumer
60 this.shardOwners = new String[numShards];
61 this.currentOffset = new long[numShards];
62 this.nextOffset = new long[numShards];
65 .forEach(partition -> this.nextOffset[partition] = -1l);
67 this.instanceUri = instanceUri.toASCIIString();
71 boolean loadInProgress()
75 .anyMatch(partition -> nextOffset[partition] < currentOffset[partition]);
78 Mono<ChatRoomInfo> sendChatRoomCreatedEvent(
83 EventChatRoomCreated to = EventChatRoomCreated.of(chatRoomId, name, shard);
84 return Mono.create(sink ->
86 ProducerRecord<String, AbstractMessageTo> record =
89 Integer.toString(shard),
92 producer.send(record, ((metadata, exception) ->
94 if (exception == null)
96 log.info("Successfully sent created event for chat chat-room: {}", to);
97 ChatRoomInfo chatRoomInfo = new ChatRoomInfo(chatRoomId, name, shard);
98 sink.success(chatRoomInfo);
104 "Could not send created event for chat-room (id={}, name={}): {}",
108 sink.error(exception);
114 void sendShardAssignedEvent(int shard)
116 EventShardAssigned to = EventShardAssigned.of(shard, instanceUri);
118 ProducerRecord<String, AbstractMessageTo> record =
119 new ProducerRecord<>(
121 Integer.toString(shard),
124 producer.send(record, ((metadata, exception) ->
126 if (metadata != null)
128 log.info("Successfully sent shard assigned event for shard: {}", shard);
134 "Could not send shard assigned event for shard {}: {}",
141 void sendShardRevokedEvent(int shard)
143 EventShardRevoked to = EventShardRevoked.of(shard, instanceUri);
145 ProducerRecord<String, AbstractMessageTo> record =
146 new ProducerRecord<>(
148 Integer.toString(shard),
151 producer.send(record, ((metadata, exception) ->
153 if (metadata != null)
155 log.info("Successfully sent shard revoked event for shard: {}", shard);
161 "Could not send shard revoked event for shard {}: {}",
175 .endOffsets(consumer.assignment())
178 .forEach(entry -> this.currentOffset[entry.getKey().partition()] = entry.getValue());
181 .forEach(partition -> this.nextOffset[partition] = 0l);
187 ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(Duration.ofMinutes(1));
188 log.debug("Fetched {} messages", records.count());
189 handleMessages(records);
191 catch (WakeupException e)
193 log.info("Received WakeupException, exiting!");
198 log.info("Exiting normally");
201 private void handleMessages(ConsumerRecords<String, AbstractMessageTo> records)
203 for (ConsumerRecord<String, AbstractMessageTo> record : records)
205 switch (record.value().getType())
207 case EVENT_CHATROOM_CREATED:
208 EventChatRoomCreated eventChatRoomCreated =
209 (EventChatRoomCreated) record.value();
210 createChatRoom(eventChatRoomCreated.toChatRoomInfo());
213 case EVENT_SHARD_ASSIGNED:
214 EventShardAssigned eventShardAssigned =
215 (EventShardAssigned) record.value();
217 "Shard {} was assigned to {}",
218 eventShardAssigned.getShard(),
219 eventShardAssigned.getUri());
220 shardOwners[eventShardAssigned.getShard()] = eventShardAssigned.getUri();
223 case EVENT_SHARD_REVOKED:
224 EventShardRevoked eventShardRevoked =
225 (EventShardRevoked) record.value();
227 "Shard {} was revoked from {}",
228 eventShardRevoked.getShard(),
229 eventShardRevoked.getUri());
230 shardOwners[eventShardRevoked.getShard()] = null;
235 "Ignoring message for key={} with offset={}: {}",
241 nextOffset[record.partition()] = record.offset() + 1;
245 private void createChatRoom(ChatRoomInfo chatRoomInfo)
247 UUID chatRoomId = chatRoomInfo.getId();
248 Integer partition = chatRoomInfo.getShard();
250 if (this.chatRoomInfo.containsKey(chatRoomId))
253 "Ignoring existing chat-room for {}: {}",
260 "Adding new chat-room for partition {}: {}",
264 this.chatRoomInfo.put(chatRoomId, chatRoomInfo);
268 Flux<ChatRoomInfo> getChatRoomInfo()
270 return Flux.fromIterable(chatRoomInfo.values());
273 Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
275 return Mono.fromSupplier(() -> chatRoomInfo.get(id));
278 Mono<String[]> getShardOwners()
280 return Mono.just(shardOwners);