this.currentOffset = new long[numShards];
this.nextOffset = new long[numShards];
this.chatrooms = new Map[numShards];
+ IntStream
+ .range(0, numShards)
+ .forEach(shard -> this.chatrooms[shard] = new HashMap<>());
this.shardingStrategy = new KafkaLikeShardingStrategy(numShards);
}
ChatRoom chatRoom = chatrooms[record.partition()].get(chatRoomId);
if (chatRoom == null)
{
- // Alles pausieren und erst von putChatRoom wieder resumen lassen!
+ // TODO: Alles pausieren und erst von putChatRoom wieder resumen lassen!
}
KafkaChatRoomService kafkaChatRoomService =
(KafkaChatRoomService) chatRoom.getChatRoomService();
return IntStream
.range(0, numShards)
.filter(shard -> isShardOwned[shard])
- .mapToObj(shard -> nextOffset[shard] >= currentOffset[shard])
- .collect(
- () -> Boolean.TRUE, // TODO: Boolean is immutable
- (acc, v) -> Boolean.valueOf(acc && v), // TODO: Boolean is immutable
- (a, b) -> Boolean.valueOf(a && b)); // TODO: Boolean is immutable
+ .allMatch(shard -> nextOffset[shard] >= currentOffset[shard]);
}
void pauseAllOwnedPartions()