1 package de.juplo.kafka.chat.backend.persistence.kafka;
3 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
4 import de.juplo.kafka.chat.backend.domain.ChatRoom;
5 import de.juplo.kafka.chat.backend.domain.Message;
6 import lombok.RequiredArgsConstructor;
7 import lombok.extern.slf4j.Slf4j;
8 import org.apache.kafka.clients.consumer.Consumer;
9 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
10 import org.apache.kafka.common.TopicPartition;
11 import reactor.core.publisher.Flux;
12 import reactor.core.publisher.Mono;
18 public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceListener
20 private final Consumer<String, MessageTo> consumer;
21 private final String topic;
22 private final long[] offsets;
23 private final MessageHandler[] handlers;
24 private final Map<UUID, ChatRoom>[] chatrooms;
27 public KafkaChatHomeService(
28 Consumer<String, MessageTo> consumer,
32 log.debug("Creating KafkaChatHomeService");
33 this.consumer = consumer;
35 this.offsets = new long[numShards];
36 this.handlers = new MessageHandler[numShards];
37 for (int i=0; i< numShards; i++)
40 this.handlers[i] = new MessageHandler(new TopicPartition(topic, i));
42 this.chatrooms = new Map[numShards];
47 public void onPartitionsAssigned(Collection<TopicPartition> partitions)
49 consumer.endOffsets(partitions).forEach((tp, currentOffset) ->
51 if (!tp.topic().equals(topic))
53 log.warn("Ignoring partition from unwanted topic: {}", tp);
57 int partition = tp.partition();
58 long unseenOffset = offsets[partition];
60 log.info("Reading partition {} from {} -> {}", partition, unseenOffset, currentOffset);
61 handlers[partition] = new ChatRoomLoadingMessageHandlingStrategy(tp, currentOffset, unseenOffset);
66 public void onPartitionsRevoked(Collection<TopicPartition> partitions)
68 partitions.forEach(tp ->
70 if (!tp.topic().equals(topic))
72 log.warn("Ignoring partition from unwanted topic: {}", tp);
76 int partition = tp.partition();
77 long unseenOffset = offsets[partition];
79 log.info("Reading partition {} from {} -> {}", partition, unseenOffset, currentOffset);
81 log.info("Revoked partitions: {}", partitions);
85 public void onPartitionsLost(Collection<TopicPartition> partitions)
87 log.info("Revoked partitions: {}", partitions);
92 Set<Integer> owned = Arrays
95 () -> new HashSet<>(),
96 (set, i) -> set.add(i),
97 (a, b) -> a.addAll(b));
98 for (int shard = 0; shard < numShards; shard++)
100 chatrooms[shard] = owned.contains(shard)
107 if (owned.contains(chatRoom.getShard()))
113 log.info("Ignoring not owned chat-room {}", chatRoom);
118 .forEach(chatroom -> chatrooms[chatroom.getShard()].put(chatroom.getId(), chatroom));
122 public Mono<ChatRoom> putChatRoom(ChatRoom chatRoom)
124 chatrooms[chatRoom.getShard()].put(chatRoom.getId(), chatRoom);
125 return Mono.just(chatRoom);
129 public Mono<ChatRoom> getChatRoom(int shard, UUID id)
131 return Mono.justOrEmpty(chatrooms[shard].get(id));
135 public Flux<ChatRoom> getChatRooms(int shard)
137 return Flux.fromStream(chatrooms[shard].values().stream());
146 interface MessageHandlingStrategy
148 MessageHandlingStrategy handleMessage(Message message);
152 @RequiredArgsConstructor
153 class NoOpMessageHandlingStrategy implements MessageHandlingStrategy
155 private final TopicPartition tp;
158 public MessageHandlingStrategy handleMessage(Message message)
160 log.warn("Not handling message {} for partition {}", message, tp);
165 class ChatRoomLoadingMessageHandlingStrategy implements MessageHandlingStrategy
167 private final TopicPartition tp;
168 private final long currentOffset;
169 private final long unseenOffset;
171 ChatRoomLoadingMessageHandlingStrategy(TopicPartition tp, long currentOffset, long unseenOffset)
174 this.currentOffset = currentOffset;
175 this.unseenOffset = unseenOffset;
177 consumer.seek(tp, unseenOffset);
181 public MessageHandlingStrategy handleMessage(Message message)
188 @RequiredArgsConstructor
189 class DefaultMessageHandlingStrategy implements MessageHandlingStrategy
191 private final TopicPartition tp;
194 public MessageHandlingStrategy handleMessage(Message message)
196 chatrooms[tp.partition()].put()