1 package de.juplo.kafka.chat.backend.persistence.kafka;
3 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
4 import de.juplo.kafka.chat.backend.domain.ChatRoom;
5 import lombok.extern.slf4j.Slf4j;
6 import org.apache.kafka.clients.consumer.Consumer;
7 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
8 import org.apache.kafka.clients.producer.Producer;
9 import org.apache.kafka.common.TopicPartition;
10 import reactor.core.publisher.Flux;
11 import reactor.core.publisher.Mono;
13 import java.time.ZoneId;
18 public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceListener
20 private final Consumer<String, MessageTo> consumer;
21 private final Producer<String, MessageTo> producer;
22 private final String topic;
23 private final ZoneId zoneId;
24 // private final long[] offsets; Erst mal immer alles neu einlesen
25 private final ChatHomeLoader[] chatHomeLoaders;
26 private final Map<UUID, ChatRoom>[] chatRoomMaps;
29 public KafkaChatHomeService(
30 Consumer<String, MessageTo> consumer,
31 Producer<String, MessageTo> producer,
36 log.debug("Creating KafkaChatHomeService");
37 this.consumer = consumer;
38 this.producer = producer;
41 // this.offsets = new long[numShards];
42 // for (int i=0; i< numShards; i++)
44 // this.offsets[i] = 0l;
46 this.chatHomeLoaders = new ChatHomeLoader[numShards];
47 this.chatRoomMaps = new Map[numShards];
52 public void onPartitionsAssigned(Collection<TopicPartition> partitions)
54 consumer.endOffsets(partitions).forEach((topicPartition, currentOffset) ->
56 if (!topicPartition.topic().equals(topic))
58 log.warn("Ignoring partition from unwanted topic: {}", topicPartition);
62 int partition = topicPartition.partition();
63 long unseenOffset = 0; // offsets[partition];
66 "Loading messages from partition {}: start-offset={} -> current-offset={}",
71 // TODO: reuse! Nicht immer alles neu laden, sondern erst ab offsets[partition]!
72 consumer.seek(topicPartition, unseenOffset);
73 chatHomeLoaders[partition] = new ChatHomeLoader(
81 public void onPartitionsRevoked(Collection<TopicPartition> partitions)
83 partitions.forEach(topicPartition ->
85 if (!topicPartition.topic().equals(topic))
87 log.warn("Ignoring partition from unwanted topic: {}", topicPartition);
91 int partition = topicPartition.partition();
92 // long unseenOffset = offsets[partition]; TODO: Offset merken...?
94 log.info("Revoked partitions: {}", partitions);
98 public void onPartitionsLost(Collection<TopicPartition> partitions)
100 // TODO: Muss auf den Verlust anders reagiert werden?
101 onPartitionsRevoked(partitions);
105 public Mono<ChatRoom> getChatRoom(int shard, UUID id)
107 return Mono.justOrEmpty(chatRoomMaps[shard].get(id));
111 public Flux<ChatRoom> getChatRooms(int shard)
113 return Flux.fromStream(chatRoomMaps[shard].values().stream());