1 package de.juplo.kafka.chat.backend.persistence.kafka;
3 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
4 import de.juplo.kafka.chat.backend.domain.ChatRoom;
5 import de.juplo.kafka.chat.backend.domain.Message;
6 import de.juplo.kafka.chat.backend.domain.ShardNotOwnedException;
7 import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
8 import lombok.extern.slf4j.Slf4j;
9 import org.apache.kafka.clients.consumer.Consumer;
10 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
11 import org.apache.kafka.clients.consumer.ConsumerRecord;
12 import org.apache.kafka.clients.consumer.ConsumerRecords;
13 import org.apache.kafka.clients.producer.Producer;
14 import org.apache.kafka.clients.producer.ProducerRecord;
15 import org.apache.kafka.common.TopicPartition;
16 import reactor.core.publisher.Flux;
17 import reactor.core.publisher.Mono;
21 import java.util.concurrent.ExecutorService;
25 public class KafkaChatHomeService implements ChatHomeService, Runnable, ConsumerRebalanceListener
27 private final ExecutorService executorService;
28 private final Consumer<String, MessageTo> consumer;
29 private final Producer<String, MessageTo> producer;
30 private final String topic;
31 private final ZoneId zoneId;
32 // private final long[] offsets; Erst mal immer alles neu einlesen
33 private final boolean[] isShardOwned;
34 private final Map<UUID, ChatRoom>[] chatRoomMaps;
35 private final KafkaLikeShardingStrategy shardingStrategy;
37 private boolean running;
38 private volatile boolean loadInProgress;
41 public KafkaChatHomeService(
42 ExecutorService executorService,
43 Consumer<String, MessageTo> consumer,
44 Producer<String, MessageTo> producer,
49 log.debug("Creating KafkaChatHomeService");
50 this.executorService = executorService;
51 this.consumer = consumer;
52 this.producer = producer;
55 // this.offsets = new long[numShards];
56 // for (int i=0; i< numShards; i++)
58 // this.offsets[i] = 0l;
60 this.isShardOwned = new boolean[numShards];
61 this.chatRoomMaps = new Map[numShards];
62 this.shardingStrategy = new KafkaLikeShardingStrategy(numShards);
67 public void onPartitionsAssigned(Collection<TopicPartition> partitions)
69 loadInProgress = true;
71 consumer.endOffsets(partitions).forEach((topicPartition, currentOffset) ->
73 if (!topicPartition.topic().equals(topic))
75 log.warn("Ignoring partition from unwanted topic: {}", topicPartition);
79 int partition = topicPartition.partition();
80 long unseenOffset = 0; // offsets[partition];
83 "Loading messages from partition {}: start-offset={} -> current-offset={}",
88 // TODO: reuse! Nicht immer alles neu laden, sondern erst ab offsets[partition]!
89 consumer.seek(topicPartition, unseenOffset);
92 consumer.resume(partitions);
96 public void onPartitionsRevoked(Collection<TopicPartition> partitions)
98 partitions.forEach(topicPartition ->
100 if (!topicPartition.topic().equals(topic))
102 log.warn("Ignoring partition from unwanted topic: {}", topicPartition);
106 int partition = topicPartition.partition();
107 // long unseenOffset = offsets[partition]; TODO: Offset merken...?
109 log.info("Revoked partitions: {}", partitions);
113 public void onPartitionsLost(Collection<TopicPartition> partitions)
115 // TODO: Muss auf den Verlust anders reagiert werden?
116 onPartitionsRevoked(partitions);
122 consumer.subscribe(List.of(topic));
130 ConsumerRecords<String, MessageTo> records = consumer.poll(Duration.ofMinutes(5));
131 log.info("Fetched {} messages", records.count());
135 for (ConsumerRecord<String, MessageTo> record : records)
137 UUID chatRoomId = UUID.fromString(record.key());
138 MessageTo messageTo = record.value();
139 ChatRoom chatRoom = chatRoomMaps[record.partition()].get(chatRoomId);
140 KafkaChatRoomService kafkaChatRoomService =
141 (KafkaChatRoomService) chatRoom.getChatRoomService();
142 Message.MessageKey key = Message.MessageKey.of(messageTo.getUser(), messageTo.getId());
143 Instant instant = Instant.ofEpochSecond(record.timestamp());
144 LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId);
145 Message message = new Message(key, record.offset(), timestamp, messageTo.getText());
146 kafkaChatRoomService.persistMessage(message);
151 if (!records.isEmpty())
153 throw new IllegalStateException("All owned partions should be paused, when no load is in progress!");
160 Mono<Message> sendMessage(
162 Message.MessageKey key,
163 LocalDateTime timestamp,
166 int shard = this.shardingStrategy.selectShard(chatRoomId);
167 TopicPartition tp = new TopicPartition(topic, shard);
168 ZonedDateTime zdt = ZonedDateTime.of(timestamp, zoneId);
169 return Mono.create(sink ->
171 ProducerRecord<String, MessageTo> record =
172 new ProducerRecord<>(
176 chatRoomId.toString(),
177 MessageTo.of(key.getUsername(), key.getMessageId(), text));
179 producer.send(record, ((metadata, exception) ->
181 if (metadata != null)
183 // On successful send
184 Message message = new Message(key, metadata.offset(), timestamp, text);
185 log.info("Successfully send message {}", message);
186 sink.success(message);
192 "Could not send message for chat-room={}, key={}, timestamp={}, text={}: {}",
198 sink.error(exception);
206 public Mono<ChatRoom> getChatRoom(int shard, UUID id)
210 throw new ShardNotOwnedException(shard);
214 return Mono.justOrEmpty(chatRoomMaps[shard].get(id));
219 public Flux<ChatRoom> getChatRooms(int shard)
223 throw new ShardNotOwnedException(shard);
227 return Flux.fromStream(chatRoomMaps[shard].values().stream());