1 package de.juplo.kafka.chat.backend.persistence.kafka;
3 import de.juplo.kafka.chat.backend.domain.*;
4 import lombok.RequiredArgsConstructor;
5 import lombok.extern.slf4j.Slf4j;
6 import org.apache.kafka.clients.consumer.Consumer;
7 import org.apache.kafka.clients.consumer.ConsumerRecord;
8 import org.apache.kafka.clients.consumer.ConsumerRecords;
9 import org.apache.kafka.clients.producer.Producer;
10 import org.apache.kafka.clients.producer.ProducerRecord;
11 import org.apache.kafka.common.TopicPartition;
12 import org.apache.kafka.common.errors.WakeupException;
13 import reactor.core.publisher.Flux;
14 import reactor.core.publisher.Mono;
17 import java.util.HashMap;
18 import java.util.List;
20 import java.util.UUID;
21 import java.util.stream.IntStream;
24 @RequiredArgsConstructor
26 public class ChatRoomChannel implements Runnable
28 private final String topic;
29 private final Consumer<String, AbstractTo> consumer;
30 private final Map<UUID, ChatRoomInfo> chatrooms = new HashMap<>();
32 private boolean running;
38 consumer.assign(List.of(new TopicPartition(topic, 0)));
46 ConsumerRecords<String, AbstractTo> records = consumer.poll(Duration.ofMinutes(5));
47 log.info("Fetched {} messages", records.count());
49 for (ConsumerRecord<String, AbstractTo> record : records)
51 switch (record.value().getType())
54 createChatRoom((ChatRoomInfoTo) record.value());
59 "Ignoring message for key {} with offset {}: {}",
66 catch (WakeupException e)
68 log.info("Received WakeupException, exiting!");
73 log.info("Exiting normally");
77 void createChatRoom(ChatRoomInfoTo chatRoomInfoTo)
79 ChatRoomInfo chatRoomInfo = chatRoomInfoTo.toChatRoomInfo();
80 chatrooms.put(chatRoomInfo.getId(), chatRoomInfo);
83 Flux<ChatRoomInfo> getChatRooms()
85 return Flux.fromIterable(chatrooms.values());