+
+ @Override
+ public void run()
+ {
+ consumer.assign(List.of(new TopicPartition(topic, 0)));
+
+ running = true;
+
+ while (running)
+ {
+ try
+ {
+ ConsumerRecords<Integer, ChatRoomTo> records = consumer.poll(Duration.ofMinutes(5));
+ log.info("Fetched {} messages", records.count());
+
+ for (ConsumerRecord<Integer, ChatRoomTo> record : records)
+ {
+ createChatRoom(record.value().toChatRoomInfo());
+ }
+ }
+ catch (WakeupException e)
+ {
+ log.info("Received WakeupException, exiting!");
+ running = false;
+ }
+ }
+ }
+
+ void createChatRoom(ChatRoomInfo chatRoomInfo)
+ {
+ UUID id = chatRoomInfo.getId();
+ String name = chatRoomInfo.getName();
+ int shard = chatRoomInfo.getShard();
+ log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize);
+ KafkaChatRoomService service = new KafkaChatRoomService(chatMessageChannel, id);
+ ChatRoom chatRoom = new ChatRoom(id, name, shard, clock, service, bufferSize);
+ chatMessageChannel.putChatRoom(chatRoom);
+ }