NEU
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / KafkaChatHomeService.java
1 package de.juplo.kafka.chat.backend.persistence.kafka;
2
3 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
4 import de.juplo.kafka.chat.backend.domain.ChatRoom;
5 import de.juplo.kafka.chat.backend.domain.Message;
6 import de.juplo.kafka.chat.backend.domain.ShardNotOwnedException;
7 import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
8 import lombok.extern.slf4j.Slf4j;
9 import org.apache.kafka.clients.consumer.Consumer;
10 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
11 import org.apache.kafka.clients.consumer.ConsumerRecord;
12 import org.apache.kafka.clients.consumer.ConsumerRecords;
13 import org.apache.kafka.clients.producer.Producer;
14 import org.apache.kafka.clients.producer.ProducerRecord;
15 import org.apache.kafka.common.TopicPartition;
16 import org.apache.kafka.common.errors.RecordDeserializationException;
17 import org.apache.kafka.common.errors.WakeupException;
18 import reactor.core.publisher.Flux;
19 import reactor.core.publisher.Mono;
20
21 import java.time.*;
22 import java.util.*;
23 import java.util.concurrent.ExecutorService;
24 import java.util.stream.IntStream;
25
26
27 @Slf4j
28 public class KafkaChatHomeService implements ChatHomeService, Runnable, ConsumerRebalanceListener
29 {
30   private final ExecutorService executorService;
31   private final Consumer<String, MessageTo> consumer;
32   private final Producer<String, MessageTo> producer;
33   private final String topic;
34   private final ZoneId zoneId;
35   private final int numShards;
36   private final boolean[] isShardOwned;
37   private final long[] currentOffset;
38   private final long[] nextOffset;
39   private final Map<UUID, ChatRoom>[] chatRoomMaps;
40   private final KafkaLikeShardingStrategy shardingStrategy;
41
42   private boolean running;
43   private volatile boolean loadInProgress;
44
45
46   public KafkaChatHomeService(
47     ExecutorService executorService,
48     Consumer<String, MessageTo> consumer,
49     Producer<String, MessageTo> producer,
50     String topic,
51     ZoneId zoneId,
52     int numShards)
53   {
54     log.debug("Creating KafkaChatHomeService");
55     this.executorService = executorService;
56     this.consumer = consumer;
57     this.producer = producer;
58     this.topic = topic;
59     this.zoneId = zoneId;
60     this.numShards = numShards;
61     this.isShardOwned = new boolean[numShards];
62     this.currentOffset = new long[numShards];
63     this.nextOffset = new long[numShards];
64     this.chatRoomMaps = new Map[numShards];
65     this.shardingStrategy = new KafkaLikeShardingStrategy(numShards);
66   }
67
68
69   @Override
70   public void onPartitionsAssigned(Collection<TopicPartition> partitions)
71   {
72     log.info("Newly assigned partitions! Pausing normal operations...");
73     loadInProgress = true;
74
75     consumer.endOffsets(partitions).forEach((topicPartition, currentOffset) ->
76     {
77       int partition = topicPartition.partition();
78       isShardOwned[partition] =  true;
79       this.currentOffset[partition] = currentOffset;
80
81       log.info(
82           "Partition assigned: {} - loading messages: next={} -> current={}",
83           partition,
84           nextOffset[partition],
85           currentOffset);
86
87       consumer.seek(topicPartition, nextOffset[partition]);
88     });
89
90     consumer.resume(partitions);
91   }
92
93   @Override
94   public void onPartitionsRevoked(Collection<TopicPartition> partitions)
95   {
96     partitions.forEach(topicPartition ->
97     {
98       int partition = topicPartition.partition();
99       isShardOwned[partition] = false;
100       log.info("Partition revoked: {} - next={}", partition, nextOffset[partition]);
101     });
102   }
103
104   @Override
105   public void onPartitionsLost(Collection<TopicPartition> partitions)
106   {
107     log.warn("Lost partitions: {}, partitions");
108     // TODO: Muss auf den Verlust anders reagiert werden?
109     onPartitionsRevoked(partitions);
110   }
111
112   @Override
113   public void run()
114   {
115     consumer.subscribe(List.of(topic));
116
117     running = true;
118
119     while (running)
120     {
121       try
122       {
123         ConsumerRecords<String, MessageTo> records = consumer.poll(Duration.ofMinutes(5));
124         log.info("Fetched {} messages", records.count());
125
126         if (loadInProgress)
127         {
128           loadMessages(records);
129
130           if (isLoadingCompleted())
131           {
132             log.info("Loading of messages completed! Pausing all owned partitions...");
133             pauseAllOwnedPartions();
134             log.info("Resuming normal operations...");
135             loadInProgress = false;
136           }
137         }
138         else
139         {
140           if (!records.isEmpty())
141           {
142             throw new IllegalStateException("All owned partitions should be paused, when no load is in progress!");
143           }
144         }
145       }
146       catch (WakeupException e)
147       {
148       }
149       catch (RecordDeserializationException e)
150       {
151       }
152     }
153   }
154
155   void loadMessages(ConsumerRecords<String, MessageTo> records)
156   {
157     for (ConsumerRecord<String, MessageTo> record : records)
158     {
159       nextOffset[record.partition()] = record.offset() + 1;
160       UUID chatRoomId = UUID.fromString(record.key());
161       MessageTo messageTo = record.value();
162
163       Message.MessageKey key = Message.MessageKey.of(messageTo.getUser(), messageTo.getId());
164
165       Instant instant = Instant.ofEpochSecond(record.timestamp());
166       LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId);
167
168       Message message = new Message(key, record.offset(), timestamp, messageTo.getText());
169
170       ChatRoom chatRoom = chatRoomMaps[record.partition()].get(chatRoomId);
171       KafkaChatRoomService kafkaChatRoomService =
172           (KafkaChatRoomService) chatRoom.getChatRoomService();
173
174       kafkaChatRoomService.persistMessage(message);
175     }
176   }
177
178   boolean isLoadingCompleted()
179   {
180     return IntStream
181         .range(0, numShards)
182         .filter(shard -> isShardOwned[shard])
183         .mapToObj(shard -> nextOffset[shard] >= currentOffset[shard])
184         .collect(
185             () -> Boolean.TRUE,
186             (acc, v) -> Boolean.valueOf(acc && v),
187             (a, b) -> Boolean.valueOf(a && b));
188   }
189
190   void pauseAllOwnedPartions()
191   {
192     consumer.pause(IntStream
193         .range(0, numShards)
194         .filter(shard -> isShardOwned[shard])
195         .mapToObj(shard -> new TopicPartition(topic, shard))
196         .toList());
197   }
198
199   Mono<Message> sendMessage(
200       UUID chatRoomId,
201       Message.MessageKey key,
202       LocalDateTime timestamp,
203       String text)
204   {
205     int shard = this.shardingStrategy.selectShard(chatRoomId);
206     TopicPartition tp = new TopicPartition(topic, shard);
207     ZonedDateTime zdt = ZonedDateTime.of(timestamp, zoneId);
208     return Mono.create(sink ->
209     {
210       ProducerRecord<String, MessageTo> record =
211           new ProducerRecord<>(
212               tp.topic(),
213               tp.partition(),
214               zdt.toEpochSecond(),
215               chatRoomId.toString(),
216               MessageTo.of(key.getUsername(), key.getMessageId(), text));
217
218       producer.send(record, ((metadata, exception) ->
219       {
220         if (metadata != null)
221         {
222           // On successful send
223           Message message = new Message(key, metadata.offset(), timestamp, text);
224           log.info("Successfully send message {}", message);
225           sink.success(message);
226         }
227         else
228         {
229           // On send-failure
230           log.error(
231               "Could not send message for chat-room={}, key={}, timestamp={}, text={}: {}",
232               chatRoomId,
233               key,
234               timestamp,
235               text,
236               exception);
237           sink.error(exception);
238         }
239       }));
240     });
241   }
242
243
244   @Override
245   public Mono<ChatRoom> getChatRoom(int shard, UUID id)
246   {
247     if (loadInProgress)
248     {
249       throw new ShardNotOwnedException(shard);
250     }
251     else
252     {
253       return Mono.justOrEmpty(chatRoomMaps[shard].get(id));
254     }
255   }
256
257   @Override
258   public Flux<ChatRoom> getChatRooms(int shard)
259   {
260     if (loadInProgress)
261     {
262       throw new ShardNotOwnedException(shard);
263     }
264     else
265     {
266       return Flux.fromStream(chatRoomMaps[shard].values().stream());
267     }
268   }
269 }