private final String topic;
private final Producer<String, AbstractMessageTo> producer;
private final Consumer<String, AbstractMessageTo> consumer;
+ private final Duration pollingInterval;
private final int numShards;
private final String[] shardOwners;
private final long[] currentOffset;
private final long[] nextOffset;
private final Map<UUID, ChatRoomInfo> chatRoomInfo;
private final String instanceUri;
+ private final ChannelMediator channelMediator;
private boolean running;
@Getter
String topic,
Producer<String, AbstractMessageTo> producer,
Consumer<String, AbstractMessageTo> infoChannelConsumer,
- URI instanceUri)
+ Duration pollingInterval,
+ int numShards,
+ URI instanceUri,
+ ChannelMediator channelMediator)
{
log.debug(
"Creating InfoChannel for topic {}",
this.producer = producer;
this.chatRoomInfo = new HashMap<>();
- this.numShards = consumer
- .partitionsFor(topic)
- .size();
+ this.pollingInterval = pollingInterval;
+
+ this.numShards = numShards;
this.shardOwners = new String[numShards];
this.currentOffset = new long[numShards];
this.nextOffset = new long[numShards];
.forEach(partition -> this.nextOffset[partition] = -1l);
this.instanceUri = instanceUri.toASCIIString();
+
+ this.channelMediator = channelMediator;
}
{
try
{
- ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(Duration.ofMinutes(1));
+ ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(pollingInterval);
log.debug("Fetched {} messages", records.count());
for (ConsumerRecord<String, AbstractMessageTo> record : records)
{
chatRoomId);
this.chatRoomInfo.put(chatRoomId, chatRoomInfo);
+ this.channelMediator.chatRoomCreated(chatRoomInfo);
}
}