b4dccbd1b2538c2e4dc2fcfa3aed1ad25834eecf
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / KafkaChatHomeService.java
1 package de.juplo.kafka.chat.backend.persistence.kafka;
2
3 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
4 import de.juplo.kafka.chat.backend.domain.ChatRoom;
5 import de.juplo.kafka.chat.backend.domain.Message;
6 import de.juplo.kafka.chat.backend.domain.ShardNotOwnedException;
7 import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
8 import lombok.extern.slf4j.Slf4j;
9 import org.apache.kafka.clients.consumer.Consumer;
10 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
11 import org.apache.kafka.clients.consumer.ConsumerRecord;
12 import org.apache.kafka.clients.consumer.ConsumerRecords;
13 import org.apache.kafka.clients.producer.Producer;
14 import org.apache.kafka.clients.producer.ProducerRecord;
15 import org.apache.kafka.common.TopicPartition;
16 import reactor.core.publisher.Flux;
17 import reactor.core.publisher.Mono;
18
19 import java.time.*;
20 import java.util.*;
21 import java.util.concurrent.ExecutorService;
22 import java.util.stream.IntStream;
23
24
25 @Slf4j
26 public class KafkaChatHomeService implements ChatHomeService, Runnable, ConsumerRebalanceListener
27 {
28   private final ExecutorService executorService;
29   private final Consumer<String, MessageTo> consumer;
30   private final Producer<String, MessageTo> producer;
31   private final String topic;
32   private final ZoneId zoneId;
33   private final int numShards;
34   private final boolean[] isShardOwned;
35   private final long[] currentOffset;
36   private final long[] nextOffset;
37   private final Map<UUID, ChatRoom>[] chatRoomMaps;
38   private final KafkaLikeShardingStrategy shardingStrategy;
39
40   private boolean running;
41   private volatile boolean loadInProgress;
42
43
44   public KafkaChatHomeService(
45     ExecutorService executorService,
46     Consumer<String, MessageTo> consumer,
47     Producer<String, MessageTo> producer,
48     String topic,
49     ZoneId zoneId,
50     int numShards)
51   {
52     log.debug("Creating KafkaChatHomeService");
53     this.executorService = executorService;
54     this.consumer = consumer;
55     this.producer = producer;
56     this.topic = topic;
57     this.zoneId = zoneId;
58     this.numShards = numShards;
59     this.isShardOwned = new boolean[numShards];
60     this.currentOffset = new long[numShards];
61     this.nextOffset = new long[numShards];
62     this.chatRoomMaps = new Map[numShards];
63     this.shardingStrategy = new KafkaLikeShardingStrategy(numShards);
64   }
65
66
67   @Override
68   public void onPartitionsAssigned(Collection<TopicPartition> partitions)
69   {
70     loadInProgress = true;
71
72     consumer.endOffsets(partitions).forEach((topicPartition, currentOffset) ->
73     {
74       int partition = topicPartition.partition();
75       isShardOwned[partition] =  true;
76       this.currentOffset[partition] = currentOffset;
77
78       log.info(
79           "Partition assigned: {} - loading messages: next={} -> current={}",
80           partition,
81           nextOffset[partition],
82           currentOffset);
83
84       consumer.seek(topicPartition, nextOffset[partition]);
85     });
86
87     consumer.resume(partitions);
88   }
89
90   @Override
91   public void onPartitionsRevoked(Collection<TopicPartition> partitions)
92   {
93     partitions.forEach(topicPartition ->
94     {
95       int partition = topicPartition.partition();
96       isShardOwned[partition] = false;
97       log.info("Partition revoked: {} - next={}", partition, nextOffset[partition]);
98     });
99   }
100
101   @Override
102   public void onPartitionsLost(Collection<TopicPartition> partitions)
103   {
104     log.warn("Lost partitions: {}, partitions");
105     // TODO: Muss auf den Verlust anders reagiert werden?
106     onPartitionsRevoked(partitions);
107   }
108
109   @Override
110   public void run()
111   {
112     consumer.subscribe(List.of(topic));
113
114     running = true;
115
116     try
117     {
118       while (running)
119       {
120         ConsumerRecords<String, MessageTo> records = consumer.poll(Duration.ofMinutes(5));
121         log.info("Fetched {} messages", records.count());
122
123         if (loadInProgress)
124         {
125           for (ConsumerRecord<String, MessageTo> record : records)
126           {
127             nextOffset[record.partition()] = record.offset() + 1;
128             UUID chatRoomId = UUID.fromString(record.key());
129             MessageTo messageTo = record.value();
130
131             Message.MessageKey key = Message.MessageKey.of(messageTo.getUser(), messageTo.getId());
132
133             Instant instant = Instant.ofEpochSecond(record.timestamp());
134             LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId);
135
136             Message message = new Message(key, record.offset(), timestamp, messageTo.getText());
137
138             ChatRoom chatRoom = chatRoomMaps[record.partition()].get(chatRoomId);
139             KafkaChatRoomService kafkaChatRoomService =
140                 (KafkaChatRoomService) chatRoom.getChatRoomService();
141
142             kafkaChatRoomService.persistMessage(message);
143           }
144
145           if (IntStream
146             .range(0, numShards)
147             .filter(shard -> isShardOwned[shard])
148             .mapToObj(shard -> nextOffset[shard] >= currentOffset[shard])
149             .collect(
150                 () -> Boolean.TRUE,
151                 (acc, v) -> Boolean.valueOf(acc && v),
152                 (a, b) -> Boolean.valueOf(a && b)))
153           {
154             log.info("Loading of messages completed! Pausing all owned partitions...");
155             consumer.pause(IntStream
156                 .range(0, numShards)
157                 .filter(shard -> isShardOwned[shard])
158                 .mapToObj(shard -> new TopicPartition(topic, shard))
159                 .toList());
160             loadInProgress = false;
161           }
162         }
163         else
164         {
165           if (!records.isEmpty())
166           {
167             throw new IllegalStateException("All owned partitions should be paused, when no load is in progress!");
168           }
169         }
170       }
171     }
172   }
173
174   Mono<Message> sendMessage(
175       UUID chatRoomId,
176       Message.MessageKey key,
177       LocalDateTime timestamp,
178       String text)
179   {
180     int shard = this.shardingStrategy.selectShard(chatRoomId);
181     TopicPartition tp = new TopicPartition(topic, shard);
182     ZonedDateTime zdt = ZonedDateTime.of(timestamp, zoneId);
183     return Mono.create(sink ->
184     {
185       ProducerRecord<String, MessageTo> record =
186           new ProducerRecord<>(
187               tp.topic(),
188               tp.partition(),
189               zdt.toEpochSecond(),
190               chatRoomId.toString(),
191               MessageTo.of(key.getUsername(), key.getMessageId(), text));
192
193       producer.send(record, ((metadata, exception) ->
194       {
195         if (metadata != null)
196         {
197           // On successful send
198           Message message = new Message(key, metadata.offset(), timestamp, text);
199           log.info("Successfully send message {}", message);
200           sink.success(message);
201         }
202         else
203         {
204           // On send-failure
205           log.error(
206               "Could not send message for chat-room={}, key={}, timestamp={}, text={}: {}",
207               chatRoomId,
208               key,
209               timestamp,
210               text,
211               exception);
212           sink.error(exception);
213         }
214       }));
215     });
216   }
217
218
219   @Override
220   public Mono<ChatRoom> getChatRoom(int shard, UUID id)
221   {
222     if (loadInProgress)
223     {
224       throw new ShardNotOwnedException(shard);
225     }
226     else
227     {
228       return Mono.justOrEmpty(chatRoomMaps[shard].get(id));
229     }
230   }
231
232   @Override
233   public Flux<ChatRoom> getChatRooms(int shard)
234   {
235     if (loadInProgress)
236     {
237       throw new ShardNotOwnedException(shard);
238     }
239     else
240     {
241       return Flux.fromStream(chatRoomMaps[shard].values().stream());
242     }
243   }
244 }