NG
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / ChatMessageChannel.java
1 package de.juplo.kafka.chat.backend.persistence.kafka;
2
3 import de.juplo.kafka.chat.backend.domain.ChatRoom;
4 import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
5 import de.juplo.kafka.chat.backend.domain.Message;
6 import lombok.Getter;
7 import lombok.extern.slf4j.Slf4j;
8 import org.apache.kafka.clients.consumer.Consumer;
9 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
10 import org.apache.kafka.clients.consumer.ConsumerRecord;
11 import org.apache.kafka.clients.consumer.ConsumerRecords;
12 import org.apache.kafka.clients.producer.Producer;
13 import org.apache.kafka.clients.producer.ProducerRecord;
14 import org.apache.kafka.common.TopicPartition;
15 import org.apache.kafka.common.errors.WakeupException;
16 import reactor.core.publisher.Flux;
17 import reactor.core.publisher.Mono;
18
19 import java.time.*;
20 import java.util.*;
21 import java.util.stream.IntStream;
22
23
24 @Slf4j
25 public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener
26 {
27   private final String topic;
28   private final Producer<String, AbstractTo> producer;
29   private final Consumer<String, AbstractTo> consumer;
30   private final ZoneId zoneId;
31   private final int numShards;
32   private final int bufferSize;
33   private final Clock clock;
34   private final boolean[] isShardOwned;
35   private final long[] currentOffset;
36   private final long[] nextOffset;
37   private final Map<UUID, ChatRoom>[] chatrooms;
38
39   private boolean running;
40   @Getter
41   private volatile boolean loadInProgress;
42
43
44   public ChatMessageChannel(
45     String topic,
46     Producer<String, AbstractTo> producer,
47     Consumer<String, AbstractTo> consumer,
48     ZoneId zoneId,
49     int numShards,
50     int bufferSize,
51     Clock clock)
52   {
53     log.debug(
54         "Creating ChatMessageChannel for topic {} with {} partitions",
55         topic,
56         numShards);
57     this.topic = topic;
58     this.consumer = consumer;
59     this.producer = producer;
60     this.zoneId = zoneId;
61     this.numShards = numShards;
62     this.bufferSize = bufferSize;
63     this.clock = clock;
64     this.isShardOwned = new boolean[numShards];
65     this.currentOffset = new long[numShards];
66     this.nextOffset = new long[numShards];
67     this.chatrooms = new Map[numShards];
68     IntStream
69         .range(0, numShards)
70         .forEach(shard -> this.chatrooms[shard] = new HashMap<>());
71   }
72
73
74
75   Mono<ChatRoomInfo> sendCreateChatRoomRequest(
76       UUID chatRoomId,
77       String name)
78   {
79     CreateChatRoomRequestTo createChatRoomRequestTo = CreateChatRoomRequestTo.of(name);
80     return Mono.create(sink ->
81     {
82       ProducerRecord<String, CreateChatRoomRequestTo> record =
83           new ProducerRecord<>(
84               topic,
85               chatRoomId.toString(),
86               createChatRoomRequestTo);
87
88       producer.send(record, ((metadata, exception) ->
89       {
90         if (metadata != null)
91         {
92           log.info("Successfully send chreate-request for chat room: {}", createChatRoomRequestTo);
93           ChatRoomInfo chatRoomInfo = ChatRoomInfo.of(chatRoomId, name, record.partition());
94           createChatRoom(chatRoomInfo);
95           sink.success(chatRoomInfo);
96         }
97         else
98         {
99           // On send-failure
100           log.error(
101               "Could not send create-request for chat room (id={}, name={}): {}",
102               chatRoomId,
103               name,
104               exception);
105           sink.error(exception);
106         }
107       }));
108     });
109   }
110
111   Mono<Message> sendChatMessage(
112       UUID chatRoomId,
113       Message.MessageKey key,
114       LocalDateTime timestamp,
115       String text)
116   {
117     ZonedDateTime zdt = ZonedDateTime.of(timestamp, zoneId);
118     return Mono.create(sink ->
119     {
120       ProducerRecord<String, AbstractTo> record =
121           new ProducerRecord<>(
122               topic,
123               null,
124               zdt.toEpochSecond(),
125               chatRoomId.toString(),
126               ChatMessageTo.of(key.getUsername(), key.getMessageId(), text));
127
128       producer.send(record, ((metadata, exception) ->
129       {
130         if (metadata != null)
131         {
132           // On successful send
133           Message message = new Message(key, metadata.offset(), timestamp, text);
134           log.info("Successfully send message {}", message);
135           sink.success(message);
136         }
137         else
138         {
139           // On send-failure
140           log.error(
141               "Could not send message for chat-room={}, key={}, timestamp={}, text={}: {}",
142               chatRoomId,
143               key,
144               timestamp,
145               text,
146               exception);
147           sink.error(exception);
148         }
149       }));
150     });
151   }
152
153   @Override
154   public void onPartitionsAssigned(Collection<TopicPartition> partitions)
155   {
156     log.info("Newly assigned partitions! Pausing normal operations...");
157     loadInProgress = true;
158
159     consumer.endOffsets(partitions).forEach((topicPartition, currentOffset) ->
160     {
161       int partition = topicPartition.partition();
162       isShardOwned[partition] =  true;
163       this.currentOffset[partition] = currentOffset;
164
165       log.info(
166           "Partition assigned: {} - loading messages: next={} -> current={}",
167           partition,
168           nextOffset[partition],
169           currentOffset);
170
171       consumer.seek(topicPartition, nextOffset[partition]);
172     });
173
174     consumer.resume(partitions);
175   }
176
177   @Override
178   public void onPartitionsRevoked(Collection<TopicPartition> partitions)
179   {
180     partitions.forEach(topicPartition ->
181     {
182       int partition = topicPartition.partition();
183       isShardOwned[partition] = false;
184       log.info("Partition revoked: {} - next={}", partition, nextOffset[partition]);
185     });
186   }
187
188   @Override
189   public void onPartitionsLost(Collection<TopicPartition> partitions)
190   {
191     log.warn("Lost partitions: {}, partitions");
192     // TODO: Muss auf den Verlust anders reagiert werden?
193     onPartitionsRevoked(partitions);
194   }
195
196   @Override
197   public void run()
198   {
199     consumer.subscribe(List.of(topic), this);
200
201     running = true;
202
203     while (running)
204     {
205       try
206       {
207         ConsumerRecords<String, AbstractTo> records = consumer.poll(Duration.ofMinutes(5));
208         log.info("Fetched {} messages", records.count());
209
210         if (loadInProgress)
211         {
212           loadChatRoom(records);
213
214           if (isLoadingCompleted())
215           {
216             log.info("Loading of messages completed! Pausing all owned partitions...");
217             pauseAllOwnedPartions();
218             log.info("Resuming normal operations...");
219             loadInProgress = false;
220           }
221         }
222         else
223         {
224           if (!records.isEmpty())
225           {
226             throw new IllegalStateException("All owned partitions should be paused, when no load is in progress!");
227           }
228         }
229       }
230       catch (WakeupException e)
231       {
232         log.info("Received WakeupException, exiting!");
233         running = false;
234       }
235     }
236
237     log.info("Exiting normally");
238   }
239
240   void loadChatRoom(ConsumerRecords<String, AbstractTo> records)
241   {
242     for (ConsumerRecord<String, AbstractTo> record : records)
243     {
244       UUID chatRoomId = UUID.fromString(record.key());
245
246       switch (record.value().getType())
247       {
248         case CREATE_CHATROOM_REQUEST:
249           createChatRoom(
250               chatRoomId,
251               (CreateChatRoomRequestTo) record.value(),
252               record.partition());
253           break;
254
255         case MESSAGE_SENT:
256           Instant instant = Instant.ofEpochSecond(record.timestamp());
257           LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId);
258           loadChatMessage(
259               chatRoomId,
260               timestamp,
261               record.offset(),
262               (ChatMessageTo) record.value(),
263               record.partition());
264           break;
265
266         default:
267           log.debug(
268               "Ignoring message for chat-room {} with offset {}: {}",
269               chatRoomId,
270               record.offset(),
271               record.value());
272       }
273
274       nextOffset[record.partition()] = record.offset() + 1;
275     }
276   }
277
278   void createChatRoom(
279       UUID chatRoomId,
280       CreateChatRoomRequestTo createChatRoomRequestTo,
281       int partition)
282   {
283     putChatRoom(ChatRoomInfo.of(
284         chatRoomId,
285         createChatRoomRequestTo.getName(),
286         partition));
287   }
288
289
290   void createChatRoom(ChatRoomInfo chatRoomInfo)
291   {
292     UUID id = chatRoomInfo.getId();
293     String name = chatRoomInfo.getName();
294     int shard = chatRoomInfo.getShard();
295     log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize);
296     KafkaChatRoomService service = new KafkaChatRoomService(this, id);
297     ChatRoom chatRoom = new ChatRoom(id, name, shard, clock, service, bufferSize);
298     putChatRoom(chatRoom);
299   }
300
301   void loadChatMessage(
302       UUID chatRoomId,
303       LocalDateTime timestamp,
304       long offset,
305       ChatMessageTo chatMessageTo,
306       int partition)
307   {
308     Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId());
309     Message message = new Message(key, offset, timestamp, chatMessageTo.getText());
310
311     ChatRoom chatRoom = chatrooms[partition].get(chatRoomId);
312     KafkaChatRoomService kafkaChatRoomService =
313         (KafkaChatRoomService) chatRoom.getChatRoomService();
314
315     kafkaChatRoomService.persistMessage(message);
316   }
317
318   boolean isLoadingCompleted()
319   {
320     return IntStream
321         .range(0, numShards)
322         .filter(shard -> isShardOwned[shard])
323         .allMatch(shard -> nextOffset[shard] >= currentOffset[shard]);
324   }
325
326   void pauseAllOwnedPartions()
327   {
328     consumer.pause(IntStream
329         .range(0, numShards)
330         .filter(shard -> isShardOwned[shard])
331         .mapToObj(shard -> new TopicPartition(topic, shard))
332         .toList());
333   }
334
335
336   private void putChatRoom(ChatRoom chatRoom)
337   {
338     Integer partition = chatRoom.getShard();
339     UUID chatRoomId = chatRoom.getId();
340     if (chatrooms[partition].containsKey(chatRoomId))
341     {
342       log.warn("Ignoring existing chat-room: " + chatRoom);
343     }
344     else
345     {
346       log.info(
347           "Adding new chat-room to partition {}: {}",
348           partition,
349           chatRoom);
350
351       chatrooms[partition].put(chatRoomId, chatRoom);
352     }
353   }
354
355   Mono<ChatRoom> getChatRoom(int shard, UUID id)
356   {
357     return Mono.justOrEmpty(chatrooms[shard].get(id));
358   }
359 }