1 package de.juplo.kafka.chat.backend.persistence.kafka;
3 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
4 import de.juplo.kafka.chat.backend.domain.ChatRoom;
5 import lombok.extern.slf4j.Slf4j;
6 import org.apache.kafka.clients.consumer.Consumer;
7 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
8 import org.apache.kafka.common.TopicPartition;
9 import reactor.core.publisher.Flux;
10 import reactor.core.publisher.Mono;
16 public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceListener
18 private final Consumer<String, MessageTo> consumer;
19 private final String topic;
20 // private final long[] offsets; Erst mal immer alles neu einlesen
21 private final Map<UUID, KafkaChatRoomService>[] kafkaChatRoomServiceMaps;
22 private final Map<UUID, ChatRoom>[] chatRoomMaps;
25 public KafkaChatHomeService(
26 Consumer<String, MessageTo> consumer,
30 log.debug("Creating KafkaChatHomeService");
31 this.consumer = consumer;
33 // this.offsets = new long[numShards];
34 // for (int i=0; i< numShards; i++)
36 // this.offsets[i] = 0l;
38 this.kafkaChatRoomServiceMaps = new Map[numShards];
39 this.chatRoomMaps = new Map[numShards];
44 public void onPartitionsAssigned(Collection<TopicPartition> partitions)
46 consumer.endOffsets(partitions).forEach((tp, currentOffset) ->
48 if (!tp.topic().equals(topic))
50 log.warn("Ignoring partition from unwanted topic: {}", tp);
54 int partition = tp.partition();
55 kafkaChatRoomServiceMaps[partition] = new HashMap<>(); // TODO: reuse! Nicht immer alles neu laden
56 long unseenOffset = 0; // offsets[partition];
59 "Loading messages from partition {}: start-offset={} -> current-offset={}",
64 consumer.seek(tp, unseenOffset);
65 chatRoomMaps[partition]
68 handlers[partition] = new ChatRoomLoadingMessageHandlingStrategy(tp, currentOffset, unseenOffset);
73 public void onPartitionsRevoked(Collection<TopicPartition> partitions)
75 partitions.forEach(tp ->
77 if (!tp.topic().equals(topic))
79 log.warn("Ignoring partition from unwanted topic: {}", tp);
83 int partition = tp.partition();
84 long unseenOffset = offsets[partition];
86 log.info("Reading partition {} from {} -> {}", partition, unseenOffset, currentOffset);
88 log.info("Revoked partitions: {}", partitions);
92 public void onPartitionsLost(Collection<TopicPartition> partitions)
94 log.info("Revoked partitions: {}", partitions);
99 Set<Integer> owned = Arrays
102 () -> new HashSet<>(),
103 (set, i) -> set.add(i),
104 (a, b) -> a.addAll(b));
105 for (int shard = 0; shard < numShards; shard++)
107 chatRoomMaps[shard] = owned.contains(shard)
114 if (owned.contains(chatRoom.getShard()))
120 log.info("Ignoring not owned chat-room {}", chatRoom);
125 .forEach(chatroom -> chatRoomMaps[chatroom.getShard()].put(chatroom.getId(), chatroom));
129 public Mono<ChatRoom> putChatRoom(ChatRoom chatRoom)
131 chatRoomMaps[chatRoom.getShard()].put(chatRoom.getId(), chatRoom);
132 return Mono.just(chatRoom);
136 public Mono<ChatRoom> getChatRoom(int shard, UUID id)
138 return Mono.justOrEmpty(chatRoomMaps[shard].get(id));
142 public Flux<ChatRoom> getChatRooms(int shard)
144 return Flux.fromStream(chatRoomMaps[shard].values().stream());