1 package de.juplo.kafka.chat.backend.persistence.kafka;
3 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
4 import de.juplo.kafka.chat.backend.domain.ChatRoom;
5 import lombok.extern.slf4j.Slf4j;
6 import org.apache.kafka.clients.consumer.Consumer;
7 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
8 import org.apache.kafka.clients.producer.Producer;
9 import org.apache.kafka.common.TopicPartition;
10 import reactor.core.publisher.Flux;
11 import reactor.core.publisher.Mono;
13 import java.time.ZoneId;
15 import java.util.concurrent.ExecutorService;
19 public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceListener
21 private final ExecutorService executorService;
22 private final Consumer<String, MessageTo> consumer;
23 private final Producer<String, MessageTo> producer;
24 private final String topic;
25 private final ZoneId zoneId;
26 // private final long[] offsets; Erst mal immer alles neu einlesen
27 private final ChatHomeLoader[] chatHomeLoaders;
28 private final Map<UUID, ChatRoom>[] chatRoomMaps;
31 public KafkaChatHomeService(
32 ExecutorService executorService,
33 Consumer<String, MessageTo> consumer,
34 Producer<String, MessageTo> producer,
39 log.debug("Creating KafkaChatHomeService");
40 this.executorService = executorService;
41 this.consumer = consumer;
42 this.producer = producer;
45 // this.offsets = new long[numShards];
46 // for (int i=0; i< numShards; i++)
48 // this.offsets[i] = 0l;
50 this.chatHomeLoaders = new ChatHomeLoader[numShards];
51 this.chatRoomMaps = new Map[numShards];
56 public void onPartitionsAssigned(Collection<TopicPartition> partitions)
58 consumer.endOffsets(partitions).forEach((topicPartition, currentOffset) ->
60 if (!topicPartition.topic().equals(topic))
62 log.warn("Ignoring partition from unwanted topic: {}", topicPartition);
66 int partition = topicPartition.partition();
67 long unseenOffset = 0; // offsets[partition];
70 "Loading messages from partition {}: start-offset={} -> current-offset={}",
75 // TODO: reuse! Nicht immer alles neu laden, sondern erst ab offsets[partition]!
76 consumer.seek(topicPartition, unseenOffset);
77 chatHomeLoaders[partition] = new ChatHomeLoader(
85 public void onPartitionsRevoked(Collection<TopicPartition> partitions)
87 partitions.forEach(topicPartition ->
89 if (!topicPartition.topic().equals(topic))
91 log.warn("Ignoring partition from unwanted topic: {}", topicPartition);
95 int partition = topicPartition.partition();
96 // long unseenOffset = offsets[partition]; TODO: Offset merken...?
98 log.info("Revoked partitions: {}", partitions);
102 public void onPartitionsLost(Collection<TopicPartition> partitions)
104 // TODO: Muss auf den Verlust anders reagiert werden?
105 onPartitionsRevoked(partitions);
109 public Mono<ChatRoom> getChatRoom(int shard, UUID id)
111 return Mono.justOrEmpty(chatRoomMaps[shard].get(id));
115 public Flux<ChatRoom> getChatRooms(int shard)
117 return Flux.fromStream(chatRoomMaps[shard].values().stream());