1 package de.juplo.kafka.chat.backend.persistence.kafka;
3 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
4 import de.juplo.kafka.chat.backend.domain.ChatRoom;
5 import lombok.extern.slf4j.Slf4j;
6 import org.apache.kafka.clients.consumer.Consumer;
7 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
8 import org.apache.kafka.common.TopicPartition;
9 import reactor.core.publisher.Flux;
10 import reactor.core.publisher.Mono;
16 public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceListener
18 private final Consumer<String, MessageTo> consumer;
19 private final String topic;
20 // private final long[] offsets; Erst mal immer alles neu einlesen
21 private final Map<UUID, ChatRoom>[] chatrooms;
24 public KafkaChatHomeService(
25 Consumer<String, MessageTo> consumer,
29 log.debug("Creating KafkaChatHomeService");
30 this.consumer = consumer;
32 // this.offsets = new long[numShards];
33 // for (int i=0; i< numShards; i++)
35 // this.offsets[i] = 0l;
37 this.chatrooms = new Map[numShards];
42 public void onPartitionsAssigned(Collection<TopicPartition> partitions)
44 consumer.endOffsets(partitions).forEach((tp, currentOffset) ->
46 if (!tp.topic().equals(topic))
48 log.warn("Ignoring partition from unwanted topic: {}", tp);
52 int partition = tp.partition();
53 long unseenOffset = 0; // offsets[partition];
56 "Loading messages from partition {}: start-offset={} -> current-offset={}",
61 consumer.seek(tp, unseenOffset);
65 handlers[partition] = new ChatRoomLoadingMessageHandlingStrategy(tp, currentOffset, unseenOffset);
70 public void onPartitionsRevoked(Collection<TopicPartition> partitions)
72 partitions.forEach(tp ->
74 if (!tp.topic().equals(topic))
76 log.warn("Ignoring partition from unwanted topic: {}", tp);
80 int partition = tp.partition();
81 long unseenOffset = offsets[partition];
83 log.info("Reading partition {} from {} -> {}", partition, unseenOffset, currentOffset);
85 log.info("Revoked partitions: {}", partitions);
89 public void onPartitionsLost(Collection<TopicPartition> partitions)
91 log.info("Revoked partitions: {}", partitions);
96 Set<Integer> owned = Arrays
99 () -> new HashSet<>(),
100 (set, i) -> set.add(i),
101 (a, b) -> a.addAll(b));
102 for (int shard = 0; shard < numShards; shard++)
104 chatrooms[shard] = owned.contains(shard)
111 if (owned.contains(chatRoom.getShard()))
117 log.info("Ignoring not owned chat-room {}", chatRoom);
122 .forEach(chatroom -> chatrooms[chatroom.getShard()].put(chatroom.getId(), chatroom));
126 public Mono<ChatRoom> putChatRoom(ChatRoom chatRoom)
128 chatrooms[chatRoom.getShard()].put(chatRoom.getId(), chatRoom);
129 return Mono.just(chatRoom);
133 public Mono<ChatRoom> getChatRoom(int shard, UUID id)
135 return Mono.justOrEmpty(chatrooms[shard].get(id));
139 public Flux<ChatRoom> getChatRooms(int shard)
141 return Flux.fromStream(chatrooms[shard].values().stream());