1 package de.juplo.kafka.chat.backend.implementation.kafka;
3 import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
4 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
5 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
6 import lombok.extern.slf4j.Slf4j;
7 import org.apache.kafka.clients.consumer.Consumer;
8 import org.apache.kafka.clients.consumer.ConsumerRecord;
9 import org.apache.kafka.clients.consumer.ConsumerRecords;
10 import org.apache.kafka.clients.producer.Producer;
11 import org.apache.kafka.clients.producer.ProducerRecord;
12 import org.apache.kafka.common.errors.WakeupException;
13 import reactor.core.publisher.Flux;
14 import reactor.core.publisher.Mono;
17 import java.util.HashMap;
19 import java.util.UUID;
20 import java.util.stream.IntStream;
24 public class InfoChannel implements Runnable
26 private final String topic;
27 private final Producer<String, AbstractMessageTo> producer;
28 private final Consumer<String, AbstractMessageTo> consumer;
29 private final int numShards;
30 private final long[] currentOffset;
31 private final long[] nextOffset;
32 private final Map<UUID, ChatRoomInfo> chatRoomInfo;
34 private boolean running;
39 Producer<String, AbstractMessageTo> producer,
40 Consumer<String, AbstractMessageTo> infoChannelConsumer)
43 "Creating InfoChannel for topic {}",
46 this.consumer = infoChannelConsumer;
47 this.producer = producer;
48 this.chatRoomInfo = new HashMap<>();
50 this.numShards = consumer
53 this.currentOffset = new long[numShards];
54 this.nextOffset = new long[numShards];
57 .forEach(partition -> this.nextOffset[partition] = -1l);
61 boolean loadInProgress()
65 .anyMatch(partition -> nextOffset[partition] < currentOffset[partition]);
68 Mono<ChatRoomInfo> sendChatRoomCreatedEvent(
73 EventChatRoomCreated to = EventChatRoomCreated.of(chatRoomId, name, shard);
74 return Mono.create(sink ->
76 ProducerRecord<String, AbstractMessageTo> record =
79 Integer.toString(shard),
82 producer.send(record, ((metadata, exception) ->
86 log.info("Successfully sent chreate-request for chat room: {}", to);
87 ChatRoomInfo chatRoomInfo = new ChatRoomInfo(chatRoomId, name, shard);
88 sink.success(chatRoomInfo);
94 "Could not send create-request for chat room (id={}, name={}): {}",
98 sink.error(exception);
111 .endOffsets(consumer.assignment())
114 .forEach(entry -> this.currentOffset[entry.getKey().partition()] = entry.getValue());
117 .forEach(partition -> this.nextOffset[partition] = 0l);
123 ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(Duration.ofMinutes(1));
124 log.debug("Fetched {} messages", records.count());
125 handleMessages(records);
127 catch (WakeupException e)
129 log.info("Received WakeupException, exiting!");
134 log.info("Exiting normally");
137 private void handleMessages(ConsumerRecords<String, AbstractMessageTo> records)
139 for (ConsumerRecord<String, AbstractMessageTo> record : records)
141 switch (record.value().getType())
143 case EVENT_CHATROOM_CREATED:
144 EventChatRoomCreated eventChatRoomCreated =
145 (EventChatRoomCreated) record.value();
146 createChatRoom(eventChatRoomCreated.toChatRoomInfo());
151 "Ignoring message for key={} with offset={}: {}",
157 nextOffset[record.partition()] = record.offset() + 1;
161 private void createChatRoom(ChatRoomInfo chatRoomInfo)
163 UUID chatRoomId = chatRoomInfo.getId();
164 Integer partition = chatRoomInfo.getShard();
166 if (this.chatRoomInfo.containsKey(chatRoomId))
169 "Ignoring existing chat-room for {}: {}",
176 "Adding new chat-room for partition {}: {}",
180 this.chatRoomInfo.put(chatRoomId, chatRoomInfo);
184 Flux<ChatRoomInfo> getChatRoomInfo()
186 return Flux.fromIterable(chatRoomInfo.values());
189 Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
191 return Mono.fromSupplier(() -> chatRoomInfo.get(id));