7b19bb6b1f2584c6246e05a831aa0b68efedbf81
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / ChatMessageChannel.java
1 package de.juplo.kafka.chat.backend.persistence.kafka;
2
3 import de.juplo.kafka.chat.backend.domain.ChatRoom;
4 import de.juplo.kafka.chat.backend.domain.Message;
5 import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
6 import lombok.Getter;
7 import lombok.extern.slf4j.Slf4j;
8 import org.apache.kafka.clients.consumer.Consumer;
9 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
10 import org.apache.kafka.clients.consumer.ConsumerRecord;
11 import org.apache.kafka.clients.consumer.ConsumerRecords;
12 import org.apache.kafka.clients.producer.Producer;
13 import org.apache.kafka.clients.producer.ProducerRecord;
14 import org.apache.kafka.common.TopicPartition;
15 import org.apache.kafka.common.errors.WakeupException;
16 import reactor.core.publisher.Flux;
17 import reactor.core.publisher.Mono;
18
19 import java.time.*;
20 import java.util.*;
21 import java.util.concurrent.Callable;
22 import java.util.stream.IntStream;
23
24
25 @Slf4j
26 public class ChatMessageChannel implements Callable<Optional<Exception>>, ConsumerRebalanceListener
27 {
28   private final String topic;
29   private final Producer<String, MessageTo> producer;
30   private final Consumer<String, MessageTo> consumer;
31   private final ZoneId zoneId;
32   private final int numShards;
33   private final boolean[] isShardOwned;
34   private final long[] currentOffset;
35   private final long[] nextOffset;
36   private final Map<UUID, ChatRoom>[] chatrooms;
37   private final KafkaLikeShardingStrategy shardingStrategy;
38
39   private boolean running;
40   @Getter
41   private volatile boolean loadInProgress;
42
43
44   public ChatMessageChannel(
45     String topic,
46     Producer<String, MessageTo> producer,
47     Consumer<String, MessageTo> consumer,
48     ZoneId zoneId,
49     int numShards)
50   {
51     log.debug(
52         "Creating ChatMessageChannel for topic {} with {} partitions",
53         topic,
54         numShards);
55     this.topic = topic;
56     this.consumer = consumer;
57     this.producer = producer;
58     this.zoneId = zoneId;
59     this.numShards = numShards;
60     this.isShardOwned = new boolean[numShards];
61     this.currentOffset = new long[numShards];
62     this.nextOffset = new long[numShards];
63     this.chatrooms = new Map[numShards];
64     this.shardingStrategy = new KafkaLikeShardingStrategy(numShards);
65   }
66
67
68   Mono<Message> sendMessage(
69       UUID chatRoomId,
70       Message.MessageKey key,
71       LocalDateTime timestamp,
72       String text)
73   {
74     int shard = this.shardingStrategy.selectShard(chatRoomId);
75     TopicPartition tp = new TopicPartition(topic, shard);
76     ZonedDateTime zdt = ZonedDateTime.of(timestamp, zoneId);
77     return Mono.create(sink ->
78     {
79       ProducerRecord<String, MessageTo> record =
80           new ProducerRecord<>(
81               tp.topic(),
82               tp.partition(),
83               zdt.toEpochSecond(),
84               chatRoomId.toString(),
85               MessageTo.of(key.getUsername(), key.getMessageId(), text));
86
87       producer.send(record, ((metadata, exception) ->
88       {
89         if (metadata != null)
90         {
91           // On successful send
92           Message message = new Message(key, metadata.offset(), timestamp, text);
93           log.info("Successfully send message {}", message);
94           sink.success(message);
95         }
96         else
97         {
98           // On send-failure
99           log.error(
100               "Could not send message for chat-room={}, key={}, timestamp={}, text={}: {}",
101               chatRoomId,
102               key,
103               timestamp,
104               text,
105               exception);
106           sink.error(exception);
107         }
108       }));
109     });
110   }
111
112   @Override
113   public void onPartitionsAssigned(Collection<TopicPartition> partitions)
114   {
115     log.info("Newly assigned partitions! Pausing normal operations...");
116     loadInProgress = true;
117
118     consumer.endOffsets(partitions).forEach((topicPartition, currentOffset) ->
119     {
120       int partition = topicPartition.partition();
121       isShardOwned[partition] =  true;
122       this.currentOffset[partition] = currentOffset;
123
124       log.info(
125           "Partition assigned: {} - loading messages: next={} -> current={}",
126           partition,
127           nextOffset[partition],
128           currentOffset);
129
130       consumer.seek(topicPartition, nextOffset[partition]);
131     });
132
133     consumer.resume(partitions);
134   }
135
136   @Override
137   public void onPartitionsRevoked(Collection<TopicPartition> partitions)
138   {
139     partitions.forEach(topicPartition ->
140     {
141       int partition = topicPartition.partition();
142       isShardOwned[partition] = false;
143       log.info("Partition revoked: {} - next={}", partition, nextOffset[partition]);
144     });
145   }
146
147   @Override
148   public void onPartitionsLost(Collection<TopicPartition> partitions)
149   {
150     log.warn("Lost partitions: {}, partitions");
151     // TODO: Muss auf den Verlust anders reagiert werden?
152     onPartitionsRevoked(partitions);
153   }
154
155   @Override
156   public Optional<Exception> call()
157   {
158     consumer.subscribe(List.of(topic));
159
160     running = true;
161
162     while (running)
163     {
164       try
165       {
166         ConsumerRecords<String, MessageTo> records = consumer.poll(Duration.ofMinutes(5));
167         log.info("Fetched {} messages", records.count());
168
169         if (loadInProgress)
170         {
171           loadMessages(records);
172
173           if (isLoadingCompleted())
174           {
175             log.info("Loading of messages completed! Pausing all owned partitions...");
176             pauseAllOwnedPartions();
177             log.info("Resuming normal operations...");
178             loadInProgress = false;
179           }
180         }
181         else
182         {
183           if (!records.isEmpty())
184           {
185             throw new IllegalStateException("All owned partitions should be paused, when no load is in progress!");
186           }
187         }
188       }
189       catch (WakeupException e)
190       {
191         log.info("Received WakeupException, exiting!");
192         running = false;
193       }
194       catch (Exception e)
195       {
196         log.error("Exiting abnormally!");
197         return Optional.of(e);
198       }
199     }
200
201     log.info("Exiting normally");
202     return Optional.empty();
203   }
204
205   void loadMessages(ConsumerRecords<String, MessageTo> records)
206   {
207     for (ConsumerRecord<String, MessageTo> record : records)
208     {
209       nextOffset[record.partition()] = record.offset() + 1;
210       UUID chatRoomId = UUID.fromString(record.key());
211       MessageTo messageTo = record.value();
212
213       Message.MessageKey key = Message.MessageKey.of(messageTo.getUser(), messageTo.getId());
214
215       Instant instant = Instant.ofEpochSecond(record.timestamp());
216       LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId);
217
218       Message message = new Message(key, record.offset(), timestamp, messageTo.getText());
219
220       ChatRoom chatRoom = chatrooms[record.partition()].get(chatRoomId);
221       if (chatRoom == null)
222       {
223         // Alles pausieren und erst von putChatRoom wieder resumen lassen!
224       }
225       KafkaChatRoomService kafkaChatRoomService =
226           (KafkaChatRoomService) chatRoom.getChatRoomService();
227
228       kafkaChatRoomService.persistMessage(message);
229     }
230   }
231
232   boolean isLoadingCompleted()
233   {
234     return IntStream
235         .range(0, numShards)
236         .filter(shard -> isShardOwned[shard])
237         .mapToObj(shard -> nextOffset[shard] >= currentOffset[shard])
238         .collect(
239             () -> Boolean.TRUE, // TODO: Boolean is immutable
240             (acc, v) -> Boolean.valueOf(acc && v), // TODO: Boolean is immutable
241             (a, b) -> Boolean.valueOf(a && b)); // TODO: Boolean is immutable
242   }
243
244   void pauseAllOwnedPartions()
245   {
246     consumer.pause(IntStream
247         .range(0, numShards)
248         .filter(shard -> isShardOwned[shard])
249         .mapToObj(shard -> new TopicPartition(topic, shard))
250         .toList());
251   }
252
253
254   void putChatRoom(ChatRoom chatRoom)
255   {
256     Integer partition = chatRoom.getShard();
257     UUID chatRoomId = chatRoom.getId();
258     if (chatrooms[partition].containsKey(chatRoomId))
259     {
260       log.warn("Ignoring existing chat-room: " + chatRoom);
261     }
262     else
263     {
264       log.info(
265           "Adding new chat-room to partition {}: {}",
266           partition,
267           chatRoom);
268
269       chatrooms[partition].put(chatRoomId, chatRoom);
270     }
271   }
272
273   Mono<ChatRoom> getChatRoom(int shard, UUID id)
274   {
275     return Mono.justOrEmpty(chatrooms[shard].get(id));
276   }
277
278   Flux<ChatRoom> getChatRooms()
279   {
280     return Flux.fromStream(IntStream
281         .range(0, numShards)
282         .filter(shard -> isShardOwned[shard])
283         .mapToObj(shard -> Integer.valueOf(shard))
284         .flatMap(shard -> chatrooms[shard].values().stream()));
285   }
286 }