package de.juplo.kafka.chat.backend.domain;
import lombok.Getter;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.stream.Collectors;
+import org.apache.kafka.common.TopicPartition;
public class ShardNotOwnedException extends IllegalStateException
{
@Getter
- private final ChatHomeService chatHomeService;
- @Getter
- private final ChatRoomInfo chatRoomInfo;
- @Getter
- private final int shard;
- @Getter
- private final int[] ownedShards;
-
-
- public ShardNotOwnedException(
- ChatHomeService chatHomeService,
- ChatRoomInfo chatRoomInfo,
- int shard,
- Collection<Integer> ownedShards)
- {
- this(
- chatHomeService,
- chatRoomInfo,
- shard,
- ShardNotOwnedException.toArray(ownedShards));
- }
-
- public ShardNotOwnedException(
- ChatHomeService chatHomeService,
- ChatRoomInfo chatRoomInfo,
- int shard,
- int[] ownedShards)
- {
- super(
- chatHomeService +
- " does not own the shard " +
- shard +
- " for ChatRoom " +
- chatRoomInfo +
- " owned shards: " +
- Arrays
- .stream(ownedShards)
- .mapToObj(ownedShard -> Integer.toString(ownedShard))
- .collect(Collectors.joining(", ")));
- this.chatHomeService = chatHomeService;
- this.chatRoomInfo = chatRoomInfo;
- this.shard = shard;
- this.ownedShards = ownedShards;
- }
+ private final TopicPartition topicPartition;
- private static int[] toArray(Collection<Integer> collection)
+ public ShardNotOwnedException(TopicPartition topicPartition)
{
- int[] array = new int[collection.size()];
- Iterator<Integer> iterator = collection.iterator();
- for (int i = 0; iterator.hasNext(); i++)
- array[i] = iterator.next();
- return array;
+ super("This instance does not own the shard for " + topicPartition);
+ this.topicPartition = topicPartition;
}
}
import de.juplo.kafka.chat.backend.domain.Message;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.TopicPartition;
+import reactor.core.publisher.Mono;
+
+import java.time.LocalDateTime;
class ChatRoomLoadingMessageHandlingStrategy implements MessageHandlingStrategy
}
@Override
- public MessageHandlingStrategy handleMessage(Message message)
+ public Mono<Message> handleMessage(Message.MessageKey key, LocalDateTime timestamp, String text)
{
- // todo
- return this;
+ return null;
}
}
package de.juplo.kafka.chat.backend.persistence.kafka;
import de.juplo.kafka.chat.backend.domain.Message;
+import de.juplo.kafka.chat.backend.domain.ShardNotOwnedException;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.common.TopicPartition;
import reactor.core.publisher.Mono;
@RequiredArgsConstructor
class ChatroomInactiveMessageHandlingStrategy implements MessageHandlingStrategy
{
- private final TopicPartition tp;
+ private final TopicPartition topicPartition;
@Override
public Mono<Message> handleMessage(
LocalDateTime timestamp,
String text)
{
- throw new
+ throw new ShardNotOwnedException(topicPartition);
}
}