import java.net.URI;
import java.nio.file.Paths;
+import java.time.Duration;
import java.util.logging.Level;
private String infoChannelTopic = "info_channel";
private String dataChannelTopic = "data_channel";
private int numPartitions = 2;
+ private Duration pollingInterval = Duration.ofSeconds(1);
private String haproxyRuntimeApi = "haproxy:8401";
private String haproxyMap = "/usr/local/etc/haproxy/sharding.map";
}
private final Consumer<String, AbstractMessageTo> consumer;
private final ZoneId zoneId;
private final int numShards;
+ private final Duration pollingInterval;
private final int bufferSize;
private final Clock clock;
private final boolean[] isShardOwned;
Consumer<String, AbstractMessageTo> dataChannelConsumer,
ZoneId zoneId,
int numShards,
+ Duration pollingInterval,
int bufferSize,
Clock clock,
InfoChannel infoChannel,
this.producer = producer;
this.zoneId = zoneId;
this.numShards = numShards;
+ this.pollingInterval = pollingInterval;
this.bufferSize = bufferSize;
this.clock = clock;
this.isShardOwned = new boolean[numShards];
{
try
{
- ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(Duration.ofMinutes(1));
+ ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(pollingInterval);
log.info("Fetched {} messages", records.count());
if (loadInProgress)
private final String topic;
private final Producer<String, AbstractMessageTo> producer;
private final Consumer<String, AbstractMessageTo> consumer;
+ private final Duration pollingInterval;
private final int numShards;
private final String[] shardOwners;
private final long[] currentOffset;
String topic,
Producer<String, AbstractMessageTo> producer,
Consumer<String, AbstractMessageTo> infoChannelConsumer,
+ Duration pollingInterval,
int numShards,
URI instanceUri)
{
this.producer = producer;
this.chatRoomInfo = new HashMap<>();
+ this.pollingInterval = pollingInterval;
+
this.numShards = numShards;
this.shardOwners = new String[numShards];
this.currentOffset = new long[numShards];
{
try
{
- ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(Duration.ofMinutes(1));
+ ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(pollingInterval);
log.debug("Fetched {} messages", records.count());
for (ConsumerRecord<String, AbstractMessageTo> record : records)
{
properties.getKafka().getInfoChannelTopic(),
producer,
infoChannelConsumer,
+ properties.getKafka().getPollingInterval(),
properties.getKafka().getNumPartitions(),
properties.getKafka().getInstanceUri());
}
dataChannelConsumer,
zoneId,
properties.getKafka().getNumPartitions(),
+ properties.getKafka().getPollingInterval(),
properties.getChatroomBufferSize(),
clock,
infoChannel,