@Slf4j
-public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
+public class DataChannel implements Runnable, ConsumerRebalanceListener
{
private final String topic;
private final Producer<String, AbstractMessageTo> producer;
private final boolean[] isShardOwned;
private final long[] currentOffset;
private final long[] nextOffset;
- private final Map<UUID, ChatRoomInfo>[] chatRoomInfo;
private final Map<UUID, ChatRoomData>[] chatRoomData;
private boolean running;
private volatile boolean loadInProgress;
- public ChatRoomChannel(
+ public DataChannel(
String topic,
Producer<String, AbstractMessageTo> producer,
Consumer<String, AbstractMessageTo> consumer,
this.isShardOwned = new boolean[numShards];
this.currentOffset = new long[numShards];
this.nextOffset = new long[numShards];
- this.chatRoomInfo = new Map[numShards];
this.chatRoomData = new Map[numShards];
IntStream
.range(0, numShards)
.forEach(shard ->
{
- this.chatRoomInfo[shard] = new HashMap<>();
this.chatRoomData[shard] = new HashMap<>();
});
}
return Mono.justOrEmpty(chatRoomData[shard].get(id));
}
-
- Flux<ChatRoomInfo> getChatRoomInfo()
- {
- return Flux
- .fromStream(IntStream.range(0, numShards).mapToObj(i -> Integer.valueOf(i)))
- .filter(shard -> isShardOwned[shard])
- .flatMap(shard -> Flux.fromIterable(chatRoomInfo[shard].values()));
- }
-
- Mono<ChatRoomInfo> getChatRoomInfo(int shard, UUID id)
- {
- if (loadInProgress)
- {
- return Mono.error(new LoadInProgressException());
- }
-
- if (!isShardOwned[shard])
- {
- return Mono.error(new ShardNotOwnedException(shard));
- }
-
- return Mono.justOrEmpty(chatRoomInfo[shard].get(id));
- }
}
package de.juplo.kafka.chat.backend.implementation.kafka;
-import de.juplo.kafka.chat.backend.domain.*;
+import de.juplo.kafka.chat.backend.domain.ChatRoomData;
+import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
+import de.juplo.kafka.chat.backend.domain.Message;
import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException;
import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException;
import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
import reactor.core.publisher.Mono;
import java.time.*;
-import java.util.*;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
import java.util.stream.IntStream;
@Slf4j
-public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
+public class InfoChannel implements Runnable, ConsumerRebalanceListener
{
private final String topic;
private final Producer<String, AbstractMessageTo> producer;
private volatile boolean loadInProgress;
- public ChatRoomChannel(
+ public InfoChannel(
String topic,
Producer<String, AbstractMessageTo> producer,
Consumer<String, AbstractMessageTo> consumer,