26e86963ec9b628bda647ca923350caedb3d1c20
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / kafka / InfoChannel.java
1 package de.juplo.kafka.chat.backend.implementation.kafka;
2
3 import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
4 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
5 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
6 import lombok.extern.slf4j.Slf4j;
7 import org.apache.kafka.clients.consumer.Consumer;
8 import org.apache.kafka.clients.consumer.ConsumerRecord;
9 import org.apache.kafka.clients.consumer.ConsumerRecords;
10 import org.apache.kafka.clients.producer.Producer;
11 import org.apache.kafka.clients.producer.ProducerRecord;
12 import org.apache.kafka.common.errors.WakeupException;
13 import reactor.core.publisher.Flux;
14 import reactor.core.publisher.Mono;
15
16 import java.time.*;
17 import java.util.HashMap;
18 import java.util.Map;
19 import java.util.UUID;
20 import java.util.stream.IntStream;
21
22
23 @Slf4j
24 public class InfoChannel implements Runnable
25 {
26   private final String topic;
27   private final Producer<String, AbstractMessageTo> producer;
28   private final Consumer<String, AbstractMessageTo> consumer;
29   private final int numShards;
30   private final long[] currentOffset;
31   private final long[] nextOffset;
32   private final Map<UUID, ChatRoomInfo> chatRoomInfo;
33
34   private boolean running;
35
36
37   public InfoChannel(
38     String topic,
39     Producer<String, AbstractMessageTo> producer,
40     Consumer<String, AbstractMessageTo> infoChannelConsumer)
41   {
42     log.debug(
43         "Creating InfoChannel for topic {}",
44         topic);
45     this.topic = topic;
46     this.consumer = infoChannelConsumer;
47     this.producer = producer;
48     this.chatRoomInfo = new HashMap<>();
49
50     this.numShards = consumer
51         .partitionsFor(topic)
52         .size();
53     this.currentOffset = new long[numShards];
54     this.nextOffset = new long[numShards];
55     IntStream
56         .range(0, numShards)
57         .forEach(partition -> this.nextOffset[partition] = -1l);
58   }
59
60
61   boolean loadInProgress()
62   {
63     return IntStream
64         .range(0, numShards)
65         .anyMatch(partition -> nextOffset[partition] < currentOffset[partition]);
66   }
67
68   Mono<ChatRoomInfo> sendChatRoomCreatedEvent(
69       UUID chatRoomId,
70       String name,
71       int shard)
72   {
73     EventChatRoomCreated to = EventChatRoomCreated.of(chatRoomId, name, shard);
74     return Mono.create(sink ->
75     {
76       ProducerRecord<String, AbstractMessageTo> record =
77           new ProducerRecord<>(
78               topic,
79               Integer.toString(shard),
80               to);
81
82       producer.send(record, ((metadata, exception) ->
83       {
84         if (metadata != null)
85         {
86           log.info("Successfully sent chreate-request for chat room: {}", to);
87           ChatRoomInfo chatRoomInfo = new ChatRoomInfo(chatRoomId, name, shard);
88           sink.success(chatRoomInfo);
89         }
90         else
91         {
92           // On send-failure
93           log.error(
94               "Could not send create-request for chat room (id={}, name={}): {}",
95               chatRoomId,
96               name,
97               exception);
98           sink.error(exception);
99         }
100       }));
101     });
102   }
103
104
105   @Override
106   public void run()
107   {
108     running = true;
109
110     consumer
111         .endOffsets(consumer.assignment())
112         .entrySet()
113         .stream()
114         .forEach(entry -> this.currentOffset[entry.getKey().partition()] = entry.getValue());
115     IntStream
116         .range(0, numShards)
117         .forEach(partition -> this.nextOffset[partition] = 0l);
118
119     while (running)
120     {
121       try
122       {
123         ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(Duration.ofMinutes(1));
124         log.debug("Fetched {} messages", records.count());
125         handleMessages(records);
126       }
127       catch (WakeupException e)
128       {
129         log.info("Received WakeupException, exiting!");
130         running = false;
131       }
132     }
133
134     log.info("Exiting normally");
135   }
136
137   private void handleMessages(ConsumerRecords<String, AbstractMessageTo> records)
138   {
139     for (ConsumerRecord<String, AbstractMessageTo> record : records)
140     {
141       switch (record.value().getType())
142       {
143         case EVENT_CHATROOM_CREATED:
144           EventChatRoomCreated eventChatRoomCreated =
145               (EventChatRoomCreated) record.value();
146           createChatRoom(eventChatRoomCreated.toChatRoomInfo());
147           break;
148
149         default:
150           log.debug(
151               "Ignoring message for key={} with offset={}: {}",
152               record.key(),
153               record.offset(),
154               record.value());
155       }
156
157       nextOffset[record.partition()] = record.offset() + 1;
158     }
159   }
160
161   private void createChatRoom(ChatRoomInfo chatRoomInfo)
162   {
163     UUID chatRoomId = chatRoomInfo.getId();
164     Integer partition = chatRoomInfo.getShard();
165
166     if (this.chatRoomInfo.containsKey(chatRoomId))
167     {
168       log.warn(
169           "Ignoring existing chat-room for {}: {}",
170           partition,
171           chatRoomId);
172     }
173     else
174     {
175       log.info(
176           "Adding new chat-room for partition {}: {}",
177           partition,
178           chatRoomId);
179
180       this.chatRoomInfo.put(chatRoomId, chatRoomInfo);
181     }
182   }
183
184   Flux<ChatRoomInfo> getChatRoomInfo()
185   {
186     return Flux.fromIterable(chatRoomInfo.values());
187   }
188
189   Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
190   {
191     return Mono.fromSupplier(() -> chatRoomInfo.get(id));
192   }
193 }