* Switched `ChatRoomData` from a multicast- to a replay-sink.
* Before, listening was implemented with a multicast-sink, that enabled
back-pressure.
* Now, it was refactored to use a replay-sink, that enables a (configurable)
limitted replay.
{
private String instanceId = "DEV";
private String allowedOrigins = "http://localhost:4200";
- private int chatroomBufferSize = 1024;
+ private int chatroomHistoryLimit = 100;
private ServiceType services = ServiceType.inmemory;
private InMemoryServicesProperties inmemory = new InMemoryServicesProperties();
private KafkaServicesProperties kafka = new KafkaServicesProperties();
private final ChatMessageService service;
private final Clock clock;
- private final int bufferSize;
+ private final int historyLimit;
private Sinks.Many<Message> sink;
public ChatRoomData(
Clock clock,
ChatMessageService service,
- int bufferSize)
+ int historyLimit)
{
- log.info("Created ChatRoom with buffer-size {}", bufferSize);
+ log.info("Created ChatRoom with history-limit {}", historyLimit);
this.clock = clock;
this.service = service;
- this.bufferSize = bufferSize;
+ this.historyLimit = historyLimit;
// @RequiredArgsConstructor unfortunately not possible, because
- // the `bufferSize` is not set, if `createSink()` is called
+ // the `historyLimit` is not set, if `createSink()` is called
// from the variable declaration!
this.sink = createSink();
}
{
return Sinks
.many()
- .multicast()
- .onBackpressureBuffer(bufferSize);
+ .replay()
+ .limit(historyLimit);
}
}
{
SimpleChatHomeService chatHomeService = new SimpleChatHomeService(
clock,
- properties.getChatroomBufferSize());
+ properties.getChatroomHistoryLimit());
chatHomeService.restore(storageStrategy).block();
return chatHomeService;
}
SimpleChatHomeService service = chatHomes[shard] = new SimpleChatHomeService(
shard,
clock,
- properties.getChatroomBufferSize());
+ properties.getChatroomHistoryLimit());
service.restore(storageStrategy).block();
});
ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards);
private final Map<UUID, ChatRoomInfo> chatRoomInfo;
private final Map<UUID, ChatRoomData> chatRoomData;
private final Clock clock;
- private final int bufferSize;
+ private final int historyLimit;
public SimpleChatHomeService(
Clock clock,
- int bufferSize)
+ int historyLimit)
{
this(
null,
clock,
- bufferSize);
+ historyLimit);
}
public SimpleChatHomeService(
Integer shard,
Clock clock,
- int bufferSize)
+ int historyLimit)
{
log.debug("Creating SimpleChatHomeService");
this.chatRoomInfo = new HashMap<>();
this.chatRoomData = new HashMap<>();
this.clock = clock;
- this.bufferSize = bufferSize;
+ this.historyLimit = historyLimit;
}
new ChatRoomData(
clock,
chatMessageService,
- bufferSize));
+ historyLimit));
return chatMessageService.restore(storageStrategy);
})
@Override
public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
{
- log.info("Creating ChatRoom with buffer-size {}", bufferSize);
+ log.info("Creating ChatRoom with history-limit {}", historyLimit);
ChatMessageService service = new InMemoryChatMessageService(id);
ChatRoomInfo chatRoomInfo = new ChatRoomInfo(id, name, shard);
this.chatRoomInfo.put(id, chatRoomInfo);
- ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize);
+ ChatRoomData chatRoomData = new ChatRoomData(clock, service, historyLimit);
this.chatRoomData.put(id, chatRoomData);
return Mono.just(chatRoomInfo);
}
private final ZoneId zoneId;
private final int numShards;
private final Duration pollingInterval;
- private final int bufferSize;
+ private final int historyLimit;
private final Clock clock;
private final boolean[] isShardOwned;
private final long[] currentOffset;
ZoneId zoneId,
int numShards,
Duration pollingInterval,
- int bufferSize,
+ int historyLimit,
Clock clock,
ChannelMediator channelMediator,
ShardingPublisherStrategy shardingPublisherStrategy)
this.zoneId = zoneId;
this.numShards = numShards;
this.pollingInterval = pollingInterval;
- this.bufferSize = bufferSize;
+ this.historyLimit = historyLimit;
this.clock = clock;
this.isShardOwned = new boolean[numShards];
this.currentOffset = new long[numShards];
}
else
{
- log.info("Creating ChatRoomData {} with buffer-size {}", chatRoomId, bufferSize);
+ log.info("Creating ChatRoomData {} with history-limit {}", chatRoomId, historyLimit);
KafkaChatMessageService service = new KafkaChatMessageService(this, chatRoomId);
- chatRoomData = new ChatRoomData(clock, service, bufferSize);
+ chatRoomData = new ChatRoomData(clock, service, historyLimit);
this.chatRoomData[shard].put(chatRoomId, chatRoomData);
}
zoneId,
properties.getKafka().getNumPartitions(),
properties.getKafka().getPollingInterval(),
- properties.getChatroomBufferSize(),
+ properties.getChatroomHistoryLimit(),
clock,
channelMediator,
shardingPublisherStrategy);