X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;ds=sidebyside;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fimplementation%2Fkafka%2FInfoChannel.java;h=f3150ce253985072dcbecd398c35053d36a04bf8;hb=eaec0e92a1887c6b1c0059de1b5db44039dc1dd4;hp=2df7573cc684c27497d4f00b2ab6a62930c308f8;hpb=35bbca17e02ae13905905ea12e58993436c9df9f;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java index 2df7573c..f3150ce2 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java @@ -32,12 +32,14 @@ public class InfoChannel implements Runnable private final String topic; private final Producer producer; private final Consumer consumer; + private final Duration pollingInterval; private final int numShards; private final String[] shardOwners; private final long[] currentOffset; private final long[] nextOffset; private final Map chatRoomInfo; private final String instanceUri; + private final ChannelMediator channelMediator; private boolean running; @Getter @@ -48,7 +50,10 @@ public class InfoChannel implements Runnable String topic, Producer producer, Consumer infoChannelConsumer, - URI instanceUri) + Duration pollingInterval, + int numShards, + URI instanceUri, + ChannelMediator channelMediator) { log.debug( "Creating InfoChannel for topic {}", @@ -58,9 +63,9 @@ public class InfoChannel implements Runnable this.producer = producer; this.chatRoomInfo = new HashMap<>(); - this.numShards = consumer - .partitionsFor(topic) - .size(); + this.pollingInterval = pollingInterval; + + this.numShards = numShards; this.shardOwners = new String[numShards]; this.currentOffset = new long[numShards]; this.nextOffset = new long[numShards]; @@ -69,6 +74,8 @@ public class InfoChannel implements Runnable .forEach(partition -> this.nextOffset[partition] = -1l); this.instanceUri = instanceUri.toASCIIString(); + + this.channelMediator = channelMediator; } @@ -192,7 +199,7 @@ public class InfoChannel implements Runnable { try { - ConsumerRecords records = consumer.poll(Duration.ofMinutes(1)); + ConsumerRecords records = consumer.poll(pollingInterval); log.debug("Fetched {} messages", records.count()); for (ConsumerRecord record : records) { @@ -279,6 +286,7 @@ public class InfoChannel implements Runnable chatRoomId); this.chatRoomInfo.put(chatRoomId, chatRoomInfo); + this.channelMediator.chatRoomCreated(chatRoomInfo); } }