X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fimplementation%2Fkafka%2FInfoChannel.java;h=a6351d0ff52354241c57bf63b3a68c9956682d0a;hb=13f86063f851fc2c4ad6de56c8edb78bff9d0592;hp=f5d5253c15ddd4b437c273d61428422150d7a6b9;hpb=e2933c8acd7ffd1c3a8530b0d34621c183ef09fa;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java index f5d5253c..a6351d0f 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java @@ -32,6 +32,7 @@ public class InfoChannel implements Runnable private final String topic; private final Producer producer; private final Consumer consumer; + private final Duration pollingInterval; private final int numShards; private final String[] shardOwners; private final long[] currentOffset; @@ -48,6 +49,7 @@ public class InfoChannel implements Runnable String topic, Producer producer, Consumer infoChannelConsumer, + Duration pollingInterval, int numShards, URI instanceUri) { @@ -59,6 +61,8 @@ public class InfoChannel implements Runnable this.producer = producer; this.chatRoomInfo = new HashMap<>(); + this.pollingInterval = pollingInterval; + this.numShards = numShards; this.shardOwners = new String[numShards]; this.currentOffset = new long[numShards]; @@ -191,7 +195,7 @@ public class InfoChannel implements Runnable { try { - ConsumerRecords records = consumer.poll(Duration.ofMinutes(1)); + ConsumerRecords records = consumer.poll(pollingInterval); log.debug("Fetched {} messages", records.count()); for (ConsumerRecord record : records) {