X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fpersistence%2Fkafka%2FKafkaChatHomeService.java;h=38aecd187e246ea906e9bb228b8e69cd00a1214c;hb=13c51b4630177e7f6649500a3d4b876a12509af6;hp=4460432254379ca6ca0a420db8e4d795f03aa4d7;hpb=394aa7848abcfc7b4510cd2688c00fce01b3c225;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java index 44604322..38aecd18 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java @@ -4,186 +4,48 @@ import de.juplo.kafka.chat.backend.domain.ChatHomeService; import de.juplo.kafka.chat.backend.domain.ChatRoom; import de.juplo.kafka.chat.backend.domain.ShardNotOwnedException; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.common.TopicPartition; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import java.time.Duration; -import java.time.ZoneId; import java.util.*; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; @Slf4j -public class KafkaChatHomeService implements ChatHomeService, Runnable, ConsumerRebalanceListener +public class KafkaChatHomeService implements ChatHomeService { - private final ExecutorService executorService; - private final Consumer consumer; - private final Producer producer; - private final String topic; - private final ZoneId zoneId; - // private final long[] offsets; Erst mal immer alles neu einlesen - private final boolean[] isShardOwned; - private final Map[] chatRoomMaps; - private final ReadWriteLock lock = new ReentrantReadWriteLock(); + private final ChatMessageChannel chatMessageChanel; - private boolean running; - - public KafkaChatHomeService( - ExecutorService executorService, - Consumer consumer, - Producer producer, - String topic, - ZoneId zoneId, - int numShards) + public KafkaChatHomeService(ChatMessageChannel chatMessageChannel) { log.debug("Creating KafkaChatHomeService"); - this.executorService = executorService; - this.consumer = consumer; - this.producer = producer; - this.topic = topic; - this.zoneId = zoneId; - // this.offsets = new long[numShards]; - // for (int i=0; i< numShards; i++) - // { - // this.offsets[i] = 0l; - // } - this.isShardOwned = new boolean[numShards]; - this.chatRoomMaps = new Map[numShards]; - } - - - @Override - public void onPartitionsAssigned(Collection partitions) - { - try - { - lock.writeLock().lock(); - - consumer.endOffsets(partitions).forEach((topicPartition, currentOffset) -> - { - if (!topicPartition.topic().equals(topic)) - { - log.warn("Ignoring partition from unwanted topic: {}", topicPartition); - return; - } - - int partition = topicPartition.partition(); - long unseenOffset = 0; // offsets[partition]; - - log.info( - "Loading messages from partition {}: start-offset={} -> current-offset={}", - partition, - unseenOffset, - currentOffset); - - // TODO: reuse! Nicht immer alles neu laden, sondern erst ab offsets[partition]! - consumer.seek(topicPartition, unseenOffset); - }); - - consumer.resume(partitions); - } - finally - { - lock.writeLock().unlock(); - } - } - - @Override - public void onPartitionsRevoked(Collection partitions) - { - partitions.forEach(topicPartition -> - { - if (!topicPartition.topic().equals(topic)) - { - log.warn("Ignoring partition from unwanted topic: {}", topicPartition); - return; - } - - int partition = topicPartition.partition(); - // long unseenOffset = offsets[partition]; TODO: Offset merken...? - }); - log.info("Revoked partitions: {}", partitions); + this.chatMessageChanel = chatMessageChannel; } - @Override - public void onPartitionsLost(Collection partitions) - { - // TODO: Muss auf den Verlust anders reagiert werden? - onPartitionsRevoked(partitions); - } - - @Override - public void run() - { - consumer.subscribe(List.of(topic)); - - running = true; - - try - { - while (running) - { - ConsumerRecords records = consumer.poll(Duration.ofMinutes(5)); - log.info("Fetched {} messages", records.count()); - - for (ConsumerRecord record : records) - { - - } - } - } - } @Override public Mono getChatRoom(int shard, UUID id) { - if (lock.readLock().tryLock()) + if (chatMessageChanel.isLoadInProgress()) { - try - { - return Mono.justOrEmpty(chatRoomMaps[shard].get(id)); - } - finally - { - lock.readLock().unlock(); - } + throw new ShardNotOwnedException(shard); } else { - throw new ShardNotOwnedException(shard); + return chatMessageChanel.getChatRoom(shard, id); } } @Override public Flux getChatRooms(int shard) { - if (lock.readLock().tryLock()) + if (chatMessageChanel.isLoadInProgress()) { - try - { - return Flux.fromStream(chatRoomMaps[shard].values().stream()); - } - finally - { - lock.readLock().unlock(); - } + throw new ShardNotOwnedException(shard); } else { - throw new ShardNotOwnedException(shard); + return chatMessageChanel.getChatRooms(shard); } } }