From: Kai Moritz Date: Sun, 26 Feb 2023 14:22:02 +0000 (+0100) Subject: WIP X-Git-Tag: kafkadata~39 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=a5b72cd8868eb96b5282643d423d0ced8dabf069;p=demos%2Fkafka%2Fchat WIP --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ShardNotOwnedException.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ShardNotOwnedException.java index d467eabb..452f26d1 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ShardNotOwnedException.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ShardNotOwnedException.java @@ -1,68 +1,18 @@ package de.juplo.kafka.chat.backend.domain; import lombok.Getter; - -import java.util.Arrays; -import java.util.Collection; -import java.util.Iterator; -import java.util.stream.Collectors; +import org.apache.kafka.common.TopicPartition; public class ShardNotOwnedException extends IllegalStateException { @Getter - private final ChatHomeService chatHomeService; - @Getter - private final ChatRoomInfo chatRoomInfo; - @Getter - private final int shard; - @Getter - private final int[] ownedShards; - - - public ShardNotOwnedException( - ChatHomeService chatHomeService, - ChatRoomInfo chatRoomInfo, - int shard, - Collection ownedShards) - { - this( - chatHomeService, - chatRoomInfo, - shard, - ShardNotOwnedException.toArray(ownedShards)); - } - - public ShardNotOwnedException( - ChatHomeService chatHomeService, - ChatRoomInfo chatRoomInfo, - int shard, - int[] ownedShards) - { - super( - chatHomeService + - " does not own the shard " + - shard + - " for ChatRoom " + - chatRoomInfo + - " owned shards: " + - Arrays - .stream(ownedShards) - .mapToObj(ownedShard -> Integer.toString(ownedShard)) - .collect(Collectors.joining(", "))); - this.chatHomeService = chatHomeService; - this.chatRoomInfo = chatRoomInfo; - this.shard = shard; - this.ownedShards = ownedShards; - } + private final TopicPartition topicPartition; - private static int[] toArray(Collection collection) + public ShardNotOwnedException(TopicPartition topicPartition) { - int[] array = new int[collection.size()]; - Iterator iterator = collection.iterator(); - for (int i = 0; iterator.hasNext(); i++) - array[i] = iterator.next(); - return array; + super("This instance does not own the shard for " + topicPartition); + this.topicPartition = topicPartition; } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomLoadingMessageHandlingStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomLoadingMessageHandlingStrategy.java index af99b809..7e554732 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomLoadingMessageHandlingStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomLoadingMessageHandlingStrategy.java @@ -3,6 +3,9 @@ package de.juplo.kafka.chat.backend.persistence.kafka; import de.juplo.kafka.chat.backend.domain.Message; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.common.TopicPartition; +import reactor.core.publisher.Mono; + +import java.time.LocalDateTime; class ChatRoomLoadingMessageHandlingStrategy implements MessageHandlingStrategy @@ -27,9 +30,8 @@ class ChatRoomLoadingMessageHandlingStrategy implements MessageHandlingStrategy } @Override - public MessageHandlingStrategy handleMessage(Message message) + public Mono handleMessage(Message.MessageKey key, LocalDateTime timestamp, String text) { - // todo - return this; + return null; } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatroomInactiveMessageHandlingStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatroomInactiveMessageHandlingStrategy.java index 892da90d..0da41117 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatroomInactiveMessageHandlingStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatroomInactiveMessageHandlingStrategy.java @@ -1,6 +1,7 @@ package de.juplo.kafka.chat.backend.persistence.kafka; import de.juplo.kafka.chat.backend.domain.Message; +import de.juplo.kafka.chat.backend.domain.ShardNotOwnedException; import lombok.RequiredArgsConstructor; import org.apache.kafka.common.TopicPartition; import reactor.core.publisher.Mono; @@ -11,7 +12,7 @@ import java.time.LocalDateTime; @RequiredArgsConstructor class ChatroomInactiveMessageHandlingStrategy implements MessageHandlingStrategy { - private final TopicPartition tp; + private final TopicPartition topicPartition; @Override public Mono handleMessage( @@ -19,6 +20,6 @@ class ChatroomInactiveMessageHandlingStrategy implements MessageHandlingStrategy LocalDateTime timestamp, String text) { - throw new + throw new ShardNotOwnedException(topicPartition); } }