1 package de.juplo.kafka.chat.backend.persistence.kafka;
3 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
4 import de.juplo.kafka.chat.backend.domain.ChatRoom;
5 import lombok.extern.slf4j.Slf4j;
6 import org.apache.kafka.clients.consumer.Consumer;
7 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
8 import org.apache.kafka.common.TopicPartition;
9 import reactor.core.publisher.Flux;
10 import reactor.core.publisher.Mono;
16 public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceListener
18 private final Consumer<String, MessageTo> consumer;
19 private final String topic;
20 private final long[] offsets;
21 private final MessageHandler[] handlers;
22 private final Map<UUID, ChatRoom>[] chatrooms;
25 public KafkaChatHomeService(
26 Consumer<String, MessageTo> consumer,
30 log.debug("Creating KafkaChatHomeService");
31 this.consumer = consumer;
33 this.offsets = new long[numShards];
34 this.handlers = new MessageHandler[numShards];
35 for (int i=0; i< numShards; i++)
38 this.handlers[i] = new MessageHandler(consumer, new TopicPartition(topic, i));
40 this.chatrooms = new Map[numShards];
45 public void onPartitionsAssigned(Collection<TopicPartition> partitions)
47 consumer.endOffsets(partitions).forEach((tp, currentOffset) ->
49 if (!tp.topic().equals(topic))
51 log.warn("Ignoring partition from unwanted topic: {}", tp);
55 int partition = tp.partition();
56 long unseenOffset = offsets[partition];
58 log.info("Reading partition {} from {} -> {}", partition, unseenOffset, currentOffset);
59 handlers[partition] = new ChatRoomLoadingMessageHandlingStrategy(tp, currentOffset, unseenOffset);
64 public void onPartitionsRevoked(Collection<TopicPartition> partitions)
66 partitions.forEach(tp ->
68 if (!tp.topic().equals(topic))
70 log.warn("Ignoring partition from unwanted topic: {}", tp);
74 int partition = tp.partition();
75 long unseenOffset = offsets[partition];
77 log.info("Reading partition {} from {} -> {}", partition, unseenOffset, currentOffset);
79 log.info("Revoked partitions: {}", partitions);
83 public void onPartitionsLost(Collection<TopicPartition> partitions)
85 log.info("Revoked partitions: {}", partitions);
90 Set<Integer> owned = Arrays
93 () -> new HashSet<>(),
94 (set, i) -> set.add(i),
95 (a, b) -> a.addAll(b));
96 for (int shard = 0; shard < numShards; shard++)
98 chatrooms[shard] = owned.contains(shard)
105 if (owned.contains(chatRoom.getShard()))
111 log.info("Ignoring not owned chat-room {}", chatRoom);
116 .forEach(chatroom -> chatrooms[chatroom.getShard()].put(chatroom.getId(), chatroom));
120 public Mono<ChatRoom> putChatRoom(ChatRoom chatRoom)
122 chatrooms[chatRoom.getShard()].put(chatRoom.getId(), chatRoom);
123 return Mono.just(chatRoom);
127 public Mono<ChatRoom> getChatRoom(int shard, UUID id)
129 return Mono.justOrEmpty(chatrooms[shard].get(id));
133 public Flux<ChatRoom> getChatRooms(int shard)
135 return Flux.fromStream(chatrooms[shard].values().stream());