1 package de.juplo.kafka.chat.backend.persistence.kafka;
3 import de.juplo.kafka.chat.backend.domain.*;
4 import lombok.RequiredArgsConstructor;
5 import lombok.extern.slf4j.Slf4j;
6 import org.apache.kafka.clients.consumer.Consumer;
7 import org.apache.kafka.clients.consumer.ConsumerRecord;
8 import org.apache.kafka.clients.consumer.ConsumerRecords;
9 import org.apache.kafka.clients.producer.Producer;
10 import org.apache.kafka.clients.producer.ProducerRecord;
11 import org.apache.kafka.common.TopicPartition;
12 import org.apache.kafka.common.errors.WakeupException;
13 import reactor.core.publisher.Mono;
16 import java.util.List;
17 import java.util.UUID;
20 @RequiredArgsConstructor
22 public class ChatRoomChannel implements Runnable
24 private final String topic;
25 private final Producer<Integer, CreateChatRoomRequestTo> producer;
26 private final Consumer<Integer, CreateChatRoomRequestTo> consumer;
27 private final ShardingStrategy shardingStrategy;
28 private final ChatMessageChannel chatMessageChannel;
29 private final Clock clock;
30 private final int bufferSize;
32 private boolean running;
38 consumer.assign(List.of(new TopicPartition(topic, 0)));
46 ConsumerRecords<Integer, CreateChatRoomRequestTo> records = consumer.poll(Duration.ofMinutes(5));
47 log.info("Fetched {} messages", records.count());
49 for (ConsumerRecord<Integer, CreateChatRoomRequestTo> record : records)
51 createChatRoom(record.value().toChatRoomInfo());
54 catch (WakeupException e)
56 log.info("Received WakeupException, exiting!");
61 log.info("Exiting normally");
65 void createChatRoom(ChatRoomInfo chatRoomInfo)
67 UUID id = chatRoomInfo.getId();
68 String name = chatRoomInfo.getName();
69 int shard = chatRoomInfo.getShard();
70 log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize);
71 KafkaChatRoomService service = new KafkaChatRoomService(chatMessageChannel, id);
72 ChatRoom chatRoom = new ChatRoom(id, name, shard, clock, service, bufferSize);
73 chatMessageChannel.putChatRoom(chatRoom);