X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fpersistence%2Fkafka%2FKafkaChatHomeService.java;h=38aecd187e246ea906e9bb228b8e69cd00a1214c;hb=13c51b4630177e7f6649500a3d4b876a12509af6;hp=e23f08db4677975bac0f5fcbf286a3278ec5d099;hpb=9aaca321143c4fd08859dc14cdd7e602cc1714f6;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java index e23f08db..38aecd18 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java @@ -2,281 +2,50 @@ package de.juplo.kafka.chat.backend.persistence.kafka; import de.juplo.kafka.chat.backend.domain.ChatHomeService; import de.juplo.kafka.chat.backend.domain.ChatRoom; -import de.juplo.kafka.chat.backend.domain.Message; import de.juplo.kafka.chat.backend.domain.ShardNotOwnedException; -import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.RecordDeserializationException; -import org.apache.kafka.common.errors.WakeupException; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import java.time.*; import java.util.*; -import java.util.stream.IntStream; @Slf4j -public class KafkaChatHomeService implements ChatHomeService, Runnable, ConsumerRebalanceListener +public class KafkaChatHomeService implements ChatHomeService { - private final String chatRoomsTopic; - private final Consumer chatRoomsConsumer; - private final Producer chatRoomsProducer; - private final String chatMessagesTopic; - private final Consumer chatMessagesConsumer; - private final Producer chatMessagesProducer; - private final ZoneId zoneId; - private final int numShards; - private final boolean[] isShardOwned; - private final long[] currentOffset; - private final long[] nextOffset; - private final Map[] chatrooms; - private final KafkaLikeShardingStrategy shardingStrategy; + private final ChatMessageChannel chatMessageChanel; - private boolean running; - private volatile boolean loadInProgress; - - public KafkaChatHomeService( - String chatRoomsTopic, - Consumer chatRoomsConsumer, - Producer chatRoomsProducer, - String chatMessagesTopic, - Consumer chatMessagesConsumer, - Producer chatMessagesProducer, - ZoneId zoneId, - int numShards) + public KafkaChatHomeService(ChatMessageChannel chatMessageChannel) { log.debug("Creating KafkaChatHomeService"); - this.chatRoomsTopic = chatRoomsTopic; - this.chatRoomsConsumer = chatRoomsConsumer; - this.chatRoomsProducer = chatRoomsProducer; - this.chatMessagesTopic = chatMessagesTopic; - this.chatMessagesConsumer = chatMessagesConsumer; - this.chatMessagesProducer = chatMessagesProducer; - this.zoneId = zoneId; - this.numShards = numShards; - this.isShardOwned = new boolean[numShards]; - this.currentOffset = new long[numShards]; - this.nextOffset = new long[numShards]; - this.chatrooms = new Map[numShards]; - this.shardingStrategy = new KafkaLikeShardingStrategy(numShards); - } - - - @Override - public void onPartitionsAssigned(Collection partitions) - { - log.info("Newly assigned partitions! Pausing normal operations..."); - loadInProgress = true; - - chatMessagesConsumer.endOffsets(partitions).forEach((topicPartition, currentOffset) -> - { - int partition = topicPartition.partition(); - isShardOwned[partition] = true; - this.currentOffset[partition] = currentOffset; - - log.info( - "Partition assigned: {} - loading messages: next={} -> current={}", - partition, - nextOffset[partition], - currentOffset); - - chatMessagesConsumer.seek(topicPartition, nextOffset[partition]); - }); - - chatMessagesConsumer.resume(partitions); - } - - @Override - public void onPartitionsRevoked(Collection partitions) - { - partitions.forEach(topicPartition -> - { - int partition = topicPartition.partition(); - isShardOwned[partition] = false; - log.info("Partition revoked: {} - next={}", partition, nextOffset[partition]); - }); + this.chatMessageChanel = chatMessageChannel; } - @Override - public void onPartitionsLost(Collection partitions) - { - log.warn("Lost partitions: {}, partitions"); - // TODO: Muss auf den Verlust anders reagiert werden? - onPartitionsRevoked(partitions); - } - - @Override - public void run() - { - chatMessagesConsumer.subscribe(List.of(chatMessagesTopic)); - - running = true; - - while (running) - { - try - { - ConsumerRecords records = chatMessagesConsumer.poll(Duration.ofMinutes(5)); - log.info("Fetched {} messages", records.count()); - - if (loadInProgress) - { - loadMessages(records); - - if (isLoadingCompleted()) - { - log.info("Loading of messages completed! Pausing all owned partitions..."); - pauseAllOwnedPartions(); - log.info("Resuming normal operations..."); - loadInProgress = false; - } - } - else - { - if (!records.isEmpty()) - { - throw new IllegalStateException("All owned partitions should be paused, when no load is in progress!"); - } - } - } - catch (WakeupException e) - { - } - catch (RecordDeserializationException e) - { - } - } - } - - void loadMessages(ConsumerRecords records) - { - for (ConsumerRecord record : records) - { - nextOffset[record.partition()] = record.offset() + 1; - UUID chatRoomId = UUID.fromString(record.key()); - MessageTo messageTo = record.value(); - - Message.MessageKey key = Message.MessageKey.of(messageTo.getUser(), messageTo.getId()); - - Instant instant = Instant.ofEpochSecond(record.timestamp()); - LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId); - - Message message = new Message(key, record.offset(), timestamp, messageTo.getText()); - - ChatRoom chatRoom = chatrooms[record.partition()].get(chatRoomId); - KafkaChatRoomService kafkaChatRoomService = - (KafkaChatRoomService) chatRoom.getChatRoomService(); - - kafkaChatRoomService.persistMessage(message); - } - } - - boolean isLoadingCompleted() - { - return IntStream - .range(0, numShards) - .filter(shard -> isShardOwned[shard]) - .mapToObj(shard -> nextOffset[shard] >= currentOffset[shard]) - .collect( - () -> Boolean.TRUE, - (acc, v) -> Boolean.valueOf(acc && v), - (a, b) -> Boolean.valueOf(a && b)); - } - - void pauseAllOwnedPartions() - { - chatMessagesConsumer.pause(IntStream - .range(0, numShards) - .filter(shard -> isShardOwned[shard]) - .mapToObj(shard -> new TopicPartition(chatMessagesTopic, shard)) - .toList()); - } - - Mono sendMessage( - UUID chatRoomId, - Message.MessageKey key, - LocalDateTime timestamp, - String text) - { - int shard = this.shardingStrategy.selectShard(chatRoomId); - TopicPartition tp = new TopicPartition(chatMessagesTopic, shard); - ZonedDateTime zdt = ZonedDateTime.of(timestamp, zoneId); - return Mono.create(sink -> - { - ProducerRecord record = - new ProducerRecord<>( - tp.topic(), - tp.partition(), - zdt.toEpochSecond(), - chatRoomId.toString(), - MessageTo.of(key.getUsername(), key.getMessageId(), text)); - - chatMessagesProducer.send(record, ((metadata, exception) -> - { - if (metadata != null) - { - // On successful send - Message message = new Message(key, metadata.offset(), timestamp, text); - log.info("Successfully send message {}", message); - sink.success(message); - } - else - { - // On send-failure - log.error( - "Could not send message for chat-room={}, key={}, timestamp={}, text={}: {}", - chatRoomId, - key, - timestamp, - text, - exception); - sink.error(exception); - } - })); - }); - } - - - public void putChatRoom(ChatRoom chatRoom) - { - - ProducerRecord record = new ProducerRecord<>(chatRoom.getShard(), ); - // TODO: Nachricht senden! - chatrooms[chatRoom.getShard()].put(chatRoom.getId(), chatRoom); - } @Override public Mono getChatRoom(int shard, UUID id) { - if (loadInProgress) + if (chatMessageChanel.isLoadInProgress()) { throw new ShardNotOwnedException(shard); } else { - return Mono.justOrEmpty(chatrooms[shard].get(id)); + return chatMessageChanel.getChatRoom(shard, id); } } @Override public Flux getChatRooms(int shard) { - if (loadInProgress) + if (chatMessageChanel.isLoadInProgress()) { throw new ShardNotOwnedException(shard); } else { - return Flux.fromStream(chatrooms[shard].values().stream()); + return chatMessageChanel.getChatRooms(shard); } } }