From f26a4e24ac140029d9b1a9aec3a18d31b8f608b7 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Tue, 12 Sep 2023 23:33:59 +0200 Subject: [PATCH] WIP:ALIGN --- .../implementation/kafka/DataChannel.java | 30 ++----------------- .../implementation/kafka/InfoChannel.java | 13 +++++--- 2 files changed, 11 insertions(+), 32 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java index d94bc659..3f4faa5b 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java @@ -25,7 +25,7 @@ import java.util.stream.IntStream; @Slf4j -public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener +public class DataChannel implements Runnable, ConsumerRebalanceListener { private final String topic; private final Producer producer; @@ -37,7 +37,6 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener private final boolean[] isShardOwned; private final long[] currentOffset; private final long[] nextOffset; - private final Map[] chatRoomInfo; private final Map[] chatRoomData; private boolean running; @@ -45,7 +44,7 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener private volatile boolean loadInProgress; - public ChatRoomChannel( + public DataChannel( String topic, Producer producer, Consumer consumer, @@ -68,13 +67,11 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener this.isShardOwned = new boolean[numShards]; this.currentOffset = new long[numShards]; this.nextOffset = new long[numShards]; - this.chatRoomInfo = new Map[numShards]; this.chatRoomData = new Map[numShards]; IntStream .range(0, numShards) .forEach(shard -> { - this.chatRoomInfo[shard] = new HashMap<>(); this.chatRoomData[shard] = new HashMap<>(); }); } @@ -402,27 +399,4 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener return Mono.justOrEmpty(chatRoomData[shard].get(id)); } - - Flux getChatRoomInfo() - { - return Flux - .fromStream(IntStream.range(0, numShards).mapToObj(i -> Integer.valueOf(i))) - .filter(shard -> isShardOwned[shard]) - .flatMap(shard -> Flux.fromIterable(chatRoomInfo[shard].values())); - } - - Mono getChatRoomInfo(int shard, UUID id) - { - if (loadInProgress) - { - return Mono.error(new LoadInProgressException()); - } - - if (!isShardOwned[shard]) - { - return Mono.error(new ShardNotOwnedException(shard)); - } - - return Mono.justOrEmpty(chatRoomInfo[shard].get(id)); - } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java index d94bc659..a3a5b43a 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java @@ -1,6 +1,8 @@ package de.juplo.kafka.chat.backend.implementation.kafka; -import de.juplo.kafka.chat.backend.domain.*; +import de.juplo.kafka.chat.backend.domain.ChatRoomData; +import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; +import de.juplo.kafka.chat.backend.domain.Message; import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException; import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException; import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; @@ -20,12 +22,15 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.time.*; -import java.util.*; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; import java.util.stream.IntStream; @Slf4j -public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener +public class InfoChannel implements Runnable, ConsumerRebalanceListener { private final String topic; private final Producer producer; @@ -45,7 +50,7 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener private volatile boolean loadInProgress; - public ChatRoomChannel( + public InfoChannel( String topic, Producer producer, Consumer consumer, -- 2.20.1