1 package de.juplo.kafka.chat.backend.persistence.kafka;
3 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
4 import de.juplo.kafka.chat.backend.domain.ChatRoom;
5 import lombok.extern.slf4j.Slf4j;
6 import org.apache.kafka.clients.consumer.Consumer;
7 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
8 import org.apache.kafka.common.TopicPartition;
9 import reactor.core.publisher.Flux;
10 import reactor.core.publisher.Mono;
16 public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceListener
18 private final Consumer<String, MessageTo> consumer;
19 private final String topic;
20 private final long[] offsets;
21 private final Map<UUID, ChatRoom>[] chatrooms;
24 public KafkaChatHomeService(
25 Consumer<String, MessageTo> consumer,
29 log.debug("Creating KafkaChatHomeService");
30 this.consumer = consumer;
32 this.offsets = new long[numShards];
33 this.chatrooms = new Map<>[numShards];
34 for (int i=0; i< numShards; i++)
37 this.chatrooms[i] = chat
39 this.chatrooms = new Map[numShards];
40 this.chatrooms = new Map[numShards];
45 public void onPartitionsAssigned(Collection<TopicPartition> partitions)
47 consumer.endOffsets(partitions).forEach((tp, currentOffset) ->
49 if (!tp.topic().equals(topic))
51 log.warn("Ignoring partition from unwanted topic: {}", tp);
55 int partition = tp.partition();
56 long unseenOffset = offsets[partition];
59 "Loading messages from partition {}: start-offset={} -> current-offset={}",
63 handlers[partition] = new ChatRoomLoadingMessageHandlingStrategy(tp, currentOffset, unseenOffset);
68 public void onPartitionsRevoked(Collection<TopicPartition> partitions)
70 partitions.forEach(tp ->
72 if (!tp.topic().equals(topic))
74 log.warn("Ignoring partition from unwanted topic: {}", tp);
78 int partition = tp.partition();
79 long unseenOffset = offsets[partition];
81 log.info("Reading partition {} from {} -> {}", partition, unseenOffset, currentOffset);
83 log.info("Revoked partitions: {}", partitions);
87 public void onPartitionsLost(Collection<TopicPartition> partitions)
89 log.info("Revoked partitions: {}", partitions);
94 Set<Integer> owned = Arrays
97 () -> new HashSet<>(),
98 (set, i) -> set.add(i),
99 (a, b) -> a.addAll(b));
100 for (int shard = 0; shard < numShards; shard++)
102 chatrooms[shard] = owned.contains(shard)
109 if (owned.contains(chatRoom.getShard()))
115 log.info("Ignoring not owned chat-room {}", chatRoom);
120 .forEach(chatroom -> chatrooms[chatroom.getShard()].put(chatroom.getId(), chatroom));
124 public Mono<ChatRoom> putChatRoom(ChatRoom chatRoom)
126 chatrooms[chatRoom.getShard()].put(chatRoom.getId(), chatRoom);
127 return Mono.just(chatRoom);
131 public Mono<ChatRoom> getChatRoom(int shard, UUID id)
133 return Mono.justOrEmpty(chatrooms[shard].get(id));
137 public Flux<ChatRoom> getChatRooms(int shard)
139 return Flux.fromStream(chatrooms[shard].values().stream());