1 package de.juplo.kafka.chat.backend.persistence.kafka;
3 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
4 import de.juplo.kafka.chat.backend.domain.ChatRoom;
5 import lombok.extern.slf4j.Slf4j;
6 import org.apache.kafka.clients.consumer.Consumer;
7 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
8 import org.apache.kafka.clients.producer.Producer;
9 import org.apache.kafka.common.TopicPartition;
10 import reactor.core.publisher.Flux;
11 import reactor.core.publisher.Mono;
13 import java.time.ZoneId;
18 public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceListener
20 private final Consumer<String, MessageTo> consumer;
21 private final Producer<String, MessageTo> producer;
22 private final String topic;
23 private final ZoneId zoneId;
24 // private final long[] offsets; Erst mal immer alles neu einlesen
25 private final ChatHomeLoader[] chatHomeLoaders;
26 private final Map<UUID, ChatRoom>[] chatRoomMaps;
29 public KafkaChatHomeService(
30 Consumer<String, MessageTo> consumer,
31 Producer<String, MessageTo> producer,
36 log.debug("Creating KafkaChatHomeService");
37 this.consumer = consumer;
38 this.producer = producer;
41 // this.offsets = new long[numShards];
42 // for (int i=0; i< numShards; i++)
44 // this.offsets[i] = 0l;
46 this.chatHomeLoaders = new ChatHomeLoader[numShards];
47 this.chatRoomMaps = new Map[numShards];
52 public void onPartitionsAssigned(Collection<TopicPartition> partitions)
54 consumer.endOffsets(partitions).forEach((topicPartition, currentOffset) ->
56 if (!topicPartition.topic().equals(topic))
58 log.warn("Ignoring partition from unwanted topic: {}", topicPartition);
62 int partition = topicPartition.partition();
63 long unseenOffset = 0; // offsets[partition];
66 "Loading messages from partition {}: start-offset={} -> current-offset={}",
71 // TODO: reuse! Nicht immer alles neu laden, sondern erst ab offsets[partition]!
72 consumer.seek(topicPartition, unseenOffset);
73 chatHomeLoaders[partition] = new ChatHomeLoader(
81 public void onPartitionsRevoked(Collection<TopicPartition> partitions)
83 partitions.forEach(tp ->
85 if (!tp.topic().equals(topic))
87 log.warn("Ignoring partition from unwanted topic: {}", tp);
91 int partition = tp.partition();
92 long unseenOffset = offsets[partition];
94 log.info("Reading partition {} from {} -> {}", partition, unseenOffset, currentOffset);
96 log.info("Revoked partitions: {}", partitions);
100 public void onPartitionsLost(Collection<TopicPartition> partitions)
102 log.info("Revoked partitions: {}", partitions);
107 Set<Integer> owned = Arrays
110 () -> new HashSet<>(),
111 (set, i) -> set.add(i),
112 (a, b) -> a.addAll(b));
113 for (int shard = 0; shard < numShards; shard++)
115 chatRoomMaps[shard] = owned.contains(shard)
122 if (owned.contains(chatRoom.getShard()))
128 log.info("Ignoring not owned chat-room {}", chatRoom);
133 .forEach(chatroom -> chatRoomMaps[chatroom.getShard()].put(chatroom.getId(), chatroom));
137 public Mono<ChatRoom> putChatRoom(ChatRoom chatRoom)
139 chatRoomMaps[chatRoom.getShard()].put(chatRoom.getId(), chatRoom);
140 return Mono.just(chatRoom);
144 public Mono<ChatRoom> getChatRoom(int shard, UUID id)
146 return Mono.justOrEmpty(chatRoomMaps[shard].get(id));
150 public Flux<ChatRoom> getChatRooms(int shard)
152 return Flux.fromStream(chatRoomMaps[shard].values().stream());