private final ZoneId zoneId;
private final int numShards;
private final Duration pollingInterval;
- private final int bufferSize;
+ private final int historyLimit;
private final Clock clock;
private final boolean[] isShardOwned;
private final long[] currentOffset;
ZoneId zoneId,
int numShards,
Duration pollingInterval,
- int bufferSize,
+ int historyLimit,
Clock clock,
ChannelMediator channelMediator,
ShardingPublisherStrategy shardingPublisherStrategy)
this.zoneId = zoneId;
this.numShards = numShards;
this.pollingInterval = pollingInterval;
- this.bufferSize = bufferSize;
+ this.historyLimit = historyLimit;
this.clock = clock;
this.isShardOwned = new boolean[numShards];
this.currentOffset = new long[numShards];
partitions.forEach(topicPartition ->
{
int partition = topicPartition.partition();
+ chatRoomData[partition]
+ .values()
+ .forEach(chatRoomData -> chatRoomData.close());
isShardOwned[partition] = false;
nextOffset[partition] = consumer.position(topicPartition);
log.info("Partition revoked: {} - next={}", partition, nextOffset[partition]);
}
else
{
- log.info("Creating ChatRoomData {} with buffer-size {}", chatRoomId, bufferSize);
+ log.info("Creating ChatRoomData {} with history-limit {}", chatRoomId, historyLimit);
KafkaChatMessageService service = new KafkaChatMessageService(this, chatRoomId);
- chatRoomData = new ChatRoomData(clock, service, bufferSize);
+ chatRoomData = new ChatRoomData(clock, service, historyLimit);
this.chatRoomData[shard].put(chatRoomId, chatRoomData);
}