NEU
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / KafkaChatHomeService.java
1 package de.juplo.kafka.chat.backend.persistence.kafka;
2
3 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
4 import de.juplo.kafka.chat.backend.domain.ChatRoom;
5 import de.juplo.kafka.chat.backend.domain.Message;
6 import de.juplo.kafka.chat.backend.domain.ShardNotOwnedException;
7 import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
8 import lombok.extern.slf4j.Slf4j;
9 import org.apache.kafka.clients.consumer.Consumer;
10 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
11 import org.apache.kafka.clients.consumer.ConsumerRecord;
12 import org.apache.kafka.clients.consumer.ConsumerRecords;
13 import org.apache.kafka.clients.producer.Producer;
14 import org.apache.kafka.clients.producer.ProducerRecord;
15 import org.apache.kafka.common.TopicPartition;
16 import org.apache.kafka.common.errors.RecordDeserializationException;
17 import org.apache.kafka.common.errors.WakeupException;
18 import reactor.core.publisher.Flux;
19 import reactor.core.publisher.Mono;
20
21 import java.time.*;
22 import java.util.*;
23 import java.util.stream.IntStream;
24
25
26 @Slf4j
27 public class KafkaChatHomeService implements ChatHomeService, Runnable, ConsumerRebalanceListener
28 {
29   private final String chatRoomsTopic;
30   private final Consumer<Integer, ChatRoomTo> chatRoomsConsumer;
31   private final Producer<Integer, ChatRoomTo> chatRoomsProducer;
32   private final String chatMessagesTopic;
33   private final Consumer<String, MessageTo> chatMessagesConsumer;
34   private final Producer<String, MessageTo> chatMessagesProducer;
35   private final ZoneId zoneId;
36   private final int numShards;
37   private final boolean[] isShardOwned;
38   private final long[] currentOffset;
39   private final long[] nextOffset;
40   private final Map<UUID, ChatRoom>[] chatrooms;
41   private final KafkaLikeShardingStrategy shardingStrategy;
42
43   private boolean running;
44   private volatile boolean loadInProgress;
45
46
47   public KafkaChatHomeService(
48     String chatRoomsTopic,
49     Consumer<Integer, ChatRoomTo> chatRoomsConsumer,
50     Producer<Integer, ChatRoomTo> chatRoomsProducer,
51     String chatMessagesTopic,
52     Consumer<String, MessageTo> chatMessagesConsumer,
53     Producer<String, MessageTo> chatMessagesProducer,
54     ZoneId zoneId,
55     int numShards)
56   {
57     log.debug("Creating KafkaChatHomeService");
58     this.chatRoomsTopic = chatRoomsTopic;
59     this.chatRoomsConsumer = chatRoomsConsumer;
60     this.chatRoomsProducer = chatRoomsProducer;
61     this.chatMessagesTopic = chatMessagesTopic;
62     this.chatMessagesConsumer = chatMessagesConsumer;
63     this.chatMessagesProducer = chatMessagesProducer;
64     this.zoneId = zoneId;
65     this.numShards = numShards;
66     this.isShardOwned = new boolean[numShards];
67     this.currentOffset = new long[numShards];
68     this.nextOffset = new long[numShards];
69     this.chatrooms = new Map[numShards];
70     this.shardingStrategy = new KafkaLikeShardingStrategy(numShards);
71   }
72
73
74   @Override
75   public void onPartitionsAssigned(Collection<TopicPartition> partitions)
76   {
77     log.info("Newly assigned partitions! Pausing normal operations...");
78     loadInProgress = true;
79
80     chatMessagesConsumer.endOffsets(partitions).forEach((topicPartition, currentOffset) ->
81     {
82       int partition = topicPartition.partition();
83       isShardOwned[partition] =  true;
84       this.currentOffset[partition] = currentOffset;
85
86       log.info(
87           "Partition assigned: {} - loading messages: next={} -> current={}",
88           partition,
89           nextOffset[partition],
90           currentOffset);
91
92       chatMessagesConsumer.seek(topicPartition, nextOffset[partition]);
93     });
94
95     chatMessagesConsumer.resume(partitions);
96   }
97
98   @Override
99   public void onPartitionsRevoked(Collection<TopicPartition> partitions)
100   {
101     partitions.forEach(topicPartition ->
102     {
103       int partition = topicPartition.partition();
104       isShardOwned[partition] = false;
105       log.info("Partition revoked: {} - next={}", partition, nextOffset[partition]);
106     });
107   }
108
109   @Override
110   public void onPartitionsLost(Collection<TopicPartition> partitions)
111   {
112     log.warn("Lost partitions: {}, partitions");
113     // TODO: Muss auf den Verlust anders reagiert werden?
114     onPartitionsRevoked(partitions);
115   }
116
117   @Override
118   public void run()
119   {
120     chatMessagesConsumer.subscribe(List.of(chatMessagesTopic));
121
122     running = true;
123
124     while (running)
125     {
126       try
127       {
128         ConsumerRecords<String, MessageTo> records = chatMessagesConsumer.poll(Duration.ofMinutes(5));
129         log.info("Fetched {} messages", records.count());
130
131         if (loadInProgress)
132         {
133           loadMessages(records);
134
135           if (isLoadingCompleted())
136           {
137             log.info("Loading of messages completed! Pausing all owned partitions...");
138             pauseAllOwnedPartions();
139             log.info("Resuming normal operations...");
140             loadInProgress = false;
141           }
142         }
143         else
144         {
145           if (!records.isEmpty())
146           {
147             throw new IllegalStateException("All owned partitions should be paused, when no load is in progress!");
148           }
149         }
150       }
151       catch (WakeupException e)
152       {
153       }
154       catch (RecordDeserializationException e)
155       {
156       }
157     }
158   }
159
160   void loadMessages(ConsumerRecords<String, MessageTo> records)
161   {
162     for (ConsumerRecord<String, MessageTo> record : records)
163     {
164       nextOffset[record.partition()] = record.offset() + 1;
165       UUID chatRoomId = UUID.fromString(record.key());
166       MessageTo messageTo = record.value();
167
168       Message.MessageKey key = Message.MessageKey.of(messageTo.getUser(), messageTo.getId());
169
170       Instant instant = Instant.ofEpochSecond(record.timestamp());
171       LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId);
172
173       Message message = new Message(key, record.offset(), timestamp, messageTo.getText());
174
175       ChatRoom chatRoom = chatrooms[record.partition()].get(chatRoomId);
176       KafkaChatRoomService kafkaChatRoomService =
177           (KafkaChatRoomService) chatRoom.getChatRoomService();
178
179       kafkaChatRoomService.persistMessage(message);
180     }
181   }
182
183   boolean isLoadingCompleted()
184   {
185     return IntStream
186         .range(0, numShards)
187         .filter(shard -> isShardOwned[shard])
188         .mapToObj(shard -> nextOffset[shard] >= currentOffset[shard])
189         .collect(
190             () -> Boolean.TRUE,
191             (acc, v) -> Boolean.valueOf(acc && v),
192             (a, b) -> Boolean.valueOf(a && b));
193   }
194
195   void pauseAllOwnedPartions()
196   {
197     chatMessagesConsumer.pause(IntStream
198         .range(0, numShards)
199         .filter(shard -> isShardOwned[shard])
200         .mapToObj(shard -> new TopicPartition(chatMessagesTopic, shard))
201         .toList());
202   }
203
204   Mono<Message> sendMessage(
205       UUID chatRoomId,
206       Message.MessageKey key,
207       LocalDateTime timestamp,
208       String text)
209   {
210     int shard = this.shardingStrategy.selectShard(chatRoomId);
211     TopicPartition tp = new TopicPartition(chatMessagesTopic, shard);
212     ZonedDateTime zdt = ZonedDateTime.of(timestamp, zoneId);
213     return Mono.create(sink ->
214     {
215       ProducerRecord<String, MessageTo> record =
216           new ProducerRecord<>(
217               tp.topic(),
218               tp.partition(),
219               zdt.toEpochSecond(),
220               chatRoomId.toString(),
221               MessageTo.of(key.getUsername(), key.getMessageId(), text));
222
223       chatMessagesProducer.send(record, ((metadata, exception) ->
224       {
225         if (metadata != null)
226         {
227           // On successful send
228           Message message = new Message(key, metadata.offset(), timestamp, text);
229           log.info("Successfully send message {}", message);
230           sink.success(message);
231         }
232         else
233         {
234           // On send-failure
235           log.error(
236               "Could not send message for chat-room={}, key={}, timestamp={}, text={}: {}",
237               chatRoomId,
238               key,
239               timestamp,
240               text,
241               exception);
242           sink.error(exception);
243         }
244       }));
245     });
246   }
247
248
249   public void putChatRoom(ChatRoom chatRoom)
250   {
251
252     ProducerRecord<Integer, ChatRoomTo> record = new ProducerRecord<>(chatRoom.getShard(), );
253     // TODO: Nachricht senden!
254     chatrooms[chatRoom.getShard()].put(chatRoom.getId(), chatRoom);
255   }
256
257   @Override
258   public Mono<ChatRoom> getChatRoom(int shard, UUID id)
259   {
260     if (loadInProgress)
261     {
262       throw new ShardNotOwnedException(shard);
263     }
264     else
265     {
266       return Mono.justOrEmpty(chatrooms[shard].get(id));
267     }
268   }
269
270   @Override
271   public Flux<ChatRoom> getChatRooms(int shard)
272   {
273     if (loadInProgress)
274     {
275       throw new ShardNotOwnedException(shard);
276     }
277     else
278     {
279       return Flux.fromStream(chatrooms[shard].values().stream());
280     }
281   }
282 }