X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fimplementation%2Fkafka%2FInfoChannel.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fimplementation%2Fkafka%2FInfoChannel.java;h=f3150ce253985072dcbecd398c35053d36a04bf8;hb=0f13dc5e88722ca7c238258747041d9663251356;hp=a6351d0ff52354241c57bf63b3a68c9956682d0a;hpb=300fb309b42aefecd475a75c946958e9b9316f7e;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java index a6351d0f..f3150ce2 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java @@ -39,6 +39,7 @@ public class InfoChannel implements Runnable private final long[] nextOffset; private final Map chatRoomInfo; private final String instanceUri; + private final ChannelMediator channelMediator; private boolean running; @Getter @@ -51,7 +52,8 @@ public class InfoChannel implements Runnable Consumer infoChannelConsumer, Duration pollingInterval, int numShards, - URI instanceUri) + URI instanceUri, + ChannelMediator channelMediator) { log.debug( "Creating InfoChannel for topic {}", @@ -72,6 +74,8 @@ public class InfoChannel implements Runnable .forEach(partition -> this.nextOffset[partition] = -1l); this.instanceUri = instanceUri.toASCIIString(); + + this.channelMediator = channelMediator; } @@ -282,6 +286,7 @@ public class InfoChannel implements Runnable chatRoomId); this.chatRoomInfo.put(chatRoomId, chatRoomInfo); + this.channelMediator.chatRoomCreated(chatRoomInfo); } }