1c6ae915c7cf4970874ff5b89a32e4242d6ec2a1
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / ChatRoomChannel.java
1 package de.juplo.kafka.chat.backend.persistence.kafka;
2
3 import de.juplo.kafka.chat.backend.domain.*;
4 import lombok.RequiredArgsConstructor;
5 import lombok.extern.slf4j.Slf4j;
6 import org.apache.kafka.clients.consumer.Consumer;
7 import org.apache.kafka.clients.consumer.ConsumerRecord;
8 import org.apache.kafka.clients.consumer.ConsumerRecords;
9 import org.apache.kafka.clients.producer.Producer;
10 import org.apache.kafka.clients.producer.ProducerRecord;
11 import org.apache.kafka.common.TopicPartition;
12 import org.apache.kafka.common.errors.WakeupException;
13 import reactor.core.publisher.Flux;
14 import reactor.core.publisher.Mono;
15
16 import java.time.*;
17 import java.util.HashMap;
18 import java.util.List;
19 import java.util.Map;
20 import java.util.UUID;
21 import java.util.stream.IntStream;
22
23
24 @RequiredArgsConstructor
25 @Slf4j
26 public class ChatRoomChannel implements Runnable
27 {
28   private final String topic;
29   private final Consumer<String, AbstractTo> consumer;
30   private final Map<UUID, ChatRoomInfo> chatrooms = new HashMap<>();
31
32   private boolean running;
33
34
35   @Override
36   public void run()
37   {
38     consumer.assign(List.of(new TopicPartition(topic, 0)));
39
40     running = true;
41
42     while (running)
43     {
44       try
45       {
46         ConsumerRecords<String, AbstractTo> records = consumer.poll(Duration.ofMinutes(5));
47         log.info("Fetched {} messages", records.count());
48
49         for (ConsumerRecord<String, AbstractTo> record : records)
50         {
51           switch (record.value().getType())
52           {
53             case CHATROOM_INFO:
54               createChatRoom((ChatRoomInfoTo) record.value());
55               break;
56
57             default:
58               log.debug(
59                   "Ignoring message for key {} with offset {}: {}",
60                   record.key(),
61                   record.offset(),
62                   record.value());
63           }
64         }
65       }
66       catch (WakeupException e)
67       {
68         log.info("Received WakeupException, exiting!");
69         running = false;
70       }
71     }
72
73     log.info("Exiting normally");
74   }
75
76
77   void createChatRoom(ChatRoomInfoTo chatRoomInfoTo)
78   {
79     ChatRoomInfo chatRoomInfo = chatRoomInfoTo.toChatRoomInfo();
80     chatrooms.put(chatRoomInfo.getId(), chatRoomInfo);
81   }
82
83   Flux<ChatRoomInfo> getChatRooms()
84   {
85     return Flux.fromIterable(chatrooms.values());
86   }
87 }