1 package de.juplo.kafka.chat.backend.persistence.kafka;
3 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
4 import de.juplo.kafka.chat.backend.domain.ChatRoom;
5 import de.juplo.kafka.chat.backend.domain.Message;
6 import de.juplo.kafka.chat.backend.domain.ShardNotOwnedException;
7 import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
8 import lombok.extern.slf4j.Slf4j;
9 import org.apache.kafka.clients.consumer.Consumer;
10 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
11 import org.apache.kafka.clients.consumer.ConsumerRecord;
12 import org.apache.kafka.clients.consumer.ConsumerRecords;
13 import org.apache.kafka.clients.producer.Producer;
14 import org.apache.kafka.clients.producer.ProducerRecord;
15 import org.apache.kafka.common.TopicPartition;
16 import org.apache.kafka.common.errors.RecordDeserializationException;
17 import org.apache.kafka.common.errors.WakeupException;
18 import reactor.core.publisher.Flux;
19 import reactor.core.publisher.Mono;
23 import java.util.concurrent.ExecutorService;
24 import java.util.stream.IntStream;
28 public class KafkaChatHomeService implements ChatHomeService, Runnable, ConsumerRebalanceListener
30 private final ExecutorService executorService;
31 private final Consumer<String, MessageTo> consumer;
32 private final Producer<String, MessageTo> producer;
33 private final String topic;
34 private final ZoneId zoneId;
35 private final int numShards;
36 private final boolean[] isShardOwned;
37 private final long[] currentOffset;
38 private final long[] nextOffset;
39 private final Map<UUID, ChatRoom>[] chatRoomMaps;
40 private final KafkaLikeShardingStrategy shardingStrategy;
42 private boolean running;
43 private volatile boolean loadInProgress;
46 public KafkaChatHomeService(
47 ExecutorService executorService,
48 Consumer<String, MessageTo> consumer,
49 Producer<String, MessageTo> producer,
54 log.debug("Creating KafkaChatHomeService");
55 this.executorService = executorService;
56 this.consumer = consumer;
57 this.producer = producer;
60 this.numShards = numShards;
61 this.isShardOwned = new boolean[numShards];
62 this.currentOffset = new long[numShards];
63 this.nextOffset = new long[numShards];
64 this.chatRoomMaps = new Map[numShards];
65 this.shardingStrategy = new KafkaLikeShardingStrategy(numShards);
70 public void onPartitionsAssigned(Collection<TopicPartition> partitions)
72 log.info("Newly assigned partitions! Pausing normal operations...");
73 loadInProgress = true;
75 consumer.endOffsets(partitions).forEach((topicPartition, currentOffset) ->
77 int partition = topicPartition.partition();
78 isShardOwned[partition] = true;
79 this.currentOffset[partition] = currentOffset;
82 "Partition assigned: {} - loading messages: next={} -> current={}",
84 nextOffset[partition],
87 consumer.seek(topicPartition, nextOffset[partition]);
90 consumer.resume(partitions);
94 public void onPartitionsRevoked(Collection<TopicPartition> partitions)
96 partitions.forEach(topicPartition ->
98 int partition = topicPartition.partition();
99 isShardOwned[partition] = false;
100 log.info("Partition revoked: {} - next={}", partition, nextOffset[partition]);
105 public void onPartitionsLost(Collection<TopicPartition> partitions)
107 log.warn("Lost partitions: {}, partitions");
108 // TODO: Muss auf den Verlust anders reagiert werden?
109 onPartitionsRevoked(partitions);
115 consumer.subscribe(List.of(topic));
123 ConsumerRecords<String, MessageTo> records = consumer.poll(Duration.ofMinutes(5));
124 log.info("Fetched {} messages", records.count());
128 loadMessages(records);
130 if (isLoadingCompleted())
132 log.info("Loading of messages completed! Pausing all owned partitions...");
133 pauseAllOwnedPartions();
134 log.info("Resuming normal operations...");
135 loadInProgress = false;
140 if (!records.isEmpty())
142 throw new IllegalStateException("All owned partitions should be paused, when no load is in progress!");
146 catch (WakeupException e)
149 catch (RecordDeserializationException e)
155 void loadMessages(ConsumerRecords<String, MessageTo> records)
157 for (ConsumerRecord<String, MessageTo> record : records)
159 nextOffset[record.partition()] = record.offset() + 1;
160 UUID chatRoomId = UUID.fromString(record.key());
161 MessageTo messageTo = record.value();
163 Message.MessageKey key = Message.MessageKey.of(messageTo.getUser(), messageTo.getId());
165 Instant instant = Instant.ofEpochSecond(record.timestamp());
166 LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId);
168 Message message = new Message(key, record.offset(), timestamp, messageTo.getText());
170 ChatRoom chatRoom = chatRoomMaps[record.partition()].get(chatRoomId);
171 KafkaChatRoomService kafkaChatRoomService =
172 (KafkaChatRoomService) chatRoom.getChatRoomService();
174 kafkaChatRoomService.persistMessage(message);
178 boolean isLoadingCompleted()
182 .filter(shard -> isShardOwned[shard])
183 .mapToObj(shard -> nextOffset[shard] >= currentOffset[shard])
186 (acc, v) -> Boolean.valueOf(acc && v),
187 (a, b) -> Boolean.valueOf(a && b));
190 void pauseAllOwnedPartions()
192 consumer.pause(IntStream
194 .filter(shard -> isShardOwned[shard])
195 .mapToObj(shard -> new TopicPartition(topic, shard))
199 Mono<Message> sendMessage(
201 Message.MessageKey key,
202 LocalDateTime timestamp,
205 int shard = this.shardingStrategy.selectShard(chatRoomId);
206 TopicPartition tp = new TopicPartition(topic, shard);
207 ZonedDateTime zdt = ZonedDateTime.of(timestamp, zoneId);
208 return Mono.create(sink ->
210 ProducerRecord<String, MessageTo> record =
211 new ProducerRecord<>(
215 chatRoomId.toString(),
216 MessageTo.of(key.getUsername(), key.getMessageId(), text));
218 producer.send(record, ((metadata, exception) ->
220 if (metadata != null)
222 // On successful send
223 Message message = new Message(key, metadata.offset(), timestamp, text);
224 log.info("Successfully send message {}", message);
225 sink.success(message);
231 "Could not send message for chat-room={}, key={}, timestamp={}, text={}: {}",
237 sink.error(exception);
245 public Mono<ChatRoom> getChatRoom(int shard, UUID id)
249 throw new ShardNotOwnedException(shard);
253 return Mono.justOrEmpty(chatRoomMaps[shard].get(id));
258 public Flux<ChatRoom> getChatRooms(int shard)
262 throw new ShardNotOwnedException(shard);
266 return Flux.fromStream(chatRoomMaps[shard].values().stream());