import java.time.*;
import java.util.*;
-import java.util.concurrent.ExecutorService;
import java.util.stream.IntStream;
@Slf4j
public class KafkaChatHomeService implements ChatHomeService, Runnable, ConsumerRebalanceListener
{
- private final ExecutorService executorService;
private final Consumer<String, MessageTo> consumer;
private final Producer<String, MessageTo> producer;
private final String topic;
private final boolean[] isShardOwned;
private final long[] currentOffset;
private final long[] nextOffset;
- private final Map<UUID, ChatRoom>[] chatRoomMaps;
+ private final Map<UUID, ChatRoom>[] chatrooms;
private final KafkaLikeShardingStrategy shardingStrategy;
private boolean running;
public KafkaChatHomeService(
- ExecutorService executorService,
Consumer<String, MessageTo> consumer,
Producer<String, MessageTo> producer,
String topic,
int numShards)
{
log.debug("Creating KafkaChatHomeService");
- this.executorService = executorService;
this.consumer = consumer;
this.producer = producer;
this.topic = topic;
this.isShardOwned = new boolean[numShards];
this.currentOffset = new long[numShards];
this.nextOffset = new long[numShards];
- this.chatRoomMaps = new Map[numShards];
+ this.chatrooms = new Map[numShards];
this.shardingStrategy = new KafkaLikeShardingStrategy(numShards);
}
Message message = new Message(key, record.offset(), timestamp, messageTo.getText());
- ChatRoom chatRoom = chatRoomMaps[record.partition()].get(chatRoomId);
+ ChatRoom chatRoom = chatrooms[record.partition()].get(chatRoomId);
KafkaChatRoomService kafkaChatRoomService =
(KafkaChatRoomService) chatRoom.getChatRoomService();
}
+ public void putChatRoom(ChatRoom chatRoom)
+ {
+ // TODO: Nachricht senden!
+ chatrooms[chatRoom.getShard()].put(chatRoom.getId(), chatRoom);
+ }
+
@Override
public Mono<ChatRoom> getChatRoom(int shard, UUID id)
{
}
else
{
- return Mono.justOrEmpty(chatRoomMaps[shard].get(id));
+ return Mono.justOrEmpty(chatrooms[shard].get(id));
}
}
}
else
{
- return Flux.fromStream(chatRoomMaps[shard].values().stream());
+ return Flux.fromStream(chatrooms[shard].values().stream());
}
}
}