1 package de.juplo.kafka.chat.backend.persistence.kafka;
3 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
4 import de.juplo.kafka.chat.backend.domain.ChatRoom;
5 import de.juplo.kafka.chat.backend.domain.ShardNotOwnedException;
6 import lombok.extern.slf4j.Slf4j;
7 import org.apache.kafka.clients.consumer.Consumer;
8 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
9 import org.apache.kafka.clients.consumer.ConsumerRecord;
10 import org.apache.kafka.clients.consumer.ConsumerRecords;
11 import org.apache.kafka.clients.producer.Producer;
12 import org.apache.kafka.common.TopicPartition;
13 import reactor.core.publisher.Flux;
14 import reactor.core.publisher.Mono;
16 import java.time.Duration;
17 import java.time.ZoneId;
19 import java.util.concurrent.ExecutorService;
20 import java.util.concurrent.TimeUnit;
21 import java.util.concurrent.TimeoutException;
22 import java.util.concurrent.locks.Lock;
23 import java.util.concurrent.locks.ReadWriteLock;
24 import java.util.concurrent.locks.ReentrantLock;
25 import java.util.concurrent.locks.ReentrantReadWriteLock;
29 public class KafkaChatHomeService implements ChatHomeService, Runnable, ConsumerRebalanceListener
31 private final ExecutorService executorService;
32 private final Consumer<String, MessageTo> consumer;
33 private final Producer<String, MessageTo> producer;
34 private final String topic;
35 private final ZoneId zoneId;
36 // private final long[] offsets; Erst mal immer alles neu einlesen
37 private final boolean[] isShardOwned;
38 private final Map<UUID, ChatRoom>[] chatRoomMaps;
39 private final ReadWriteLock lock = new ReentrantReadWriteLock();
41 private boolean running;
44 public KafkaChatHomeService(
45 ExecutorService executorService,
46 Consumer<String, MessageTo> consumer,
47 Producer<String, MessageTo> producer,
52 log.debug("Creating KafkaChatHomeService");
53 this.executorService = executorService;
54 this.consumer = consumer;
55 this.producer = producer;
58 // this.offsets = new long[numShards];
59 // for (int i=0; i< numShards; i++)
61 // this.offsets[i] = 0l;
63 this.isShardOwned = new boolean[numShards];
64 this.chatRoomMaps = new Map[numShards];
69 public void onPartitionsAssigned(Collection<TopicPartition> partitions)
73 lock.writeLock().lock();
75 consumer.endOffsets(partitions).forEach((topicPartition, currentOffset) ->
77 if (!topicPartition.topic().equals(topic))
79 log.warn("Ignoring partition from unwanted topic: {}", topicPartition);
83 int partition = topicPartition.partition();
84 long unseenOffset = 0; // offsets[partition];
87 "Loading messages from partition {}: start-offset={} -> current-offset={}",
92 // TODO: reuse! Nicht immer alles neu laden, sondern erst ab offsets[partition]!
93 consumer.seek(topicPartition, unseenOffset);
96 consumer.resume(partitions);
100 lock.writeLock().unlock();
105 public void onPartitionsRevoked(Collection<TopicPartition> partitions)
107 partitions.forEach(topicPartition ->
109 if (!topicPartition.topic().equals(topic))
111 log.warn("Ignoring partition from unwanted topic: {}", topicPartition);
115 int partition = topicPartition.partition();
116 // long unseenOffset = offsets[partition]; TODO: Offset merken...?
118 log.info("Revoked partitions: {}", partitions);
122 public void onPartitionsLost(Collection<TopicPartition> partitions)
124 // TODO: Muss auf den Verlust anders reagiert werden?
125 onPartitionsRevoked(partitions);
131 consumer.subscribe(List.of(topic));
139 ConsumerRecords<String, MessageTo> records = consumer.poll(Duration.ofMinutes(5));
140 log.info("Fetched {} messages", records.count());
142 for (ConsumerRecord<String, MessageTo> record : records)
151 public Mono<ChatRoom> getChatRoom(int shard, UUID id)
153 if (lock.readLock().tryLock())
157 return Mono.justOrEmpty(chatRoomMaps[shard].get(id));
161 lock.readLock().unlock();
166 throw new ShardNotOwnedException(shard);
171 public Flux<ChatRoom> getChatRooms(int shard)
173 if (lock.readLock().tryLock())
177 return Flux.fromStream(chatRoomMaps[shard].values().stream());
181 lock.readLock().unlock();
186 throw new ShardNotOwnedException(shard);