1 package de.juplo.kafka.chat.backend.persistence.kafka;
3 import de.juplo.kafka.chat.backend.domain.*;
4 import lombok.RequiredArgsConstructor;
5 import lombok.extern.slf4j.Slf4j;
6 import org.apache.kafka.clients.consumer.Consumer;
7 import org.apache.kafka.clients.consumer.ConsumerRecord;
8 import org.apache.kafka.clients.consumer.ConsumerRecords;
9 import org.apache.kafka.clients.producer.Producer;
10 import org.apache.kafka.clients.producer.ProducerRecord;
11 import org.apache.kafka.common.TopicPartition;
12 import org.apache.kafka.common.errors.WakeupException;
13 import reactor.core.publisher.Mono;
16 import java.util.List;
17 import java.util.Optional;
18 import java.util.UUID;
19 import java.util.concurrent.Callable;
22 @RequiredArgsConstructor
24 public class ChatRoomChannel implements Callable<Optional<Exception>>
26 private final String topic;
27 private final Producer<Integer, ChatRoomTo> producer;
28 private final Consumer<Integer, ChatRoomTo> consumer;
29 private final ShardingStrategy shardingStrategy;
30 private final ChatMessageChannel chatMessageChannel;
31 private final Clock clock;
32 private final int bufferSize;
34 private boolean running;
37 Mono<ChatRoomInfo> sendCreateChatRoomRequest(
41 int shard = this.shardingStrategy.selectShard(chatRoomId);
42 ChatRoomTo chatRoomTo = ChatRoomTo.of(chatRoomId, name, shard);
43 return Mono.create(sink ->
45 ProducerRecord<Integer, ChatRoomTo> record =
51 producer.send(record, ((metadata, exception) ->
55 log.info("Successfully send chreate-request for chat room: {}", chatRoomTo);
56 sink.success(chatRoomTo.toChatRoomInfo());
62 "Could not send create-request for chat room (id={}, name={}): {}",
66 sink.error(exception);
73 public Optional<Exception> call()
75 consumer.assign(List.of(new TopicPartition(topic, 0)));
83 ConsumerRecords<Integer, ChatRoomTo> records = consumer.poll(Duration.ofMinutes(5));
84 log.info("Fetched {} messages", records.count());
86 for (ConsumerRecord<Integer, ChatRoomTo> record : records)
88 createChatRoom(record.value().toChatRoomInfo());
91 catch (WakeupException e)
93 log.info("Received WakeupException, exiting!");
98 log.error("Exiting abnormally!");
99 return Optional.of(e);
103 log.info("Exiting normally");
104 return Optional.empty();
108 void createChatRoom(ChatRoomInfo chatRoomInfo)
110 UUID id = chatRoomInfo.getId();
111 String name = chatRoomInfo.getName();
112 int shard = chatRoomInfo.getShard();
113 log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize);
114 KafkaChatRoomService service = new KafkaChatRoomService(chatMessageChannel, id);
115 ChatRoom chatRoom = new ChatRoom(id, name, shard, clock, service, bufferSize);
116 chatMessageChannel.putChatRoom(chatRoom);