X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fimplementation%2Fkafka%2FKafkaServicesConfiguration.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fimplementation%2Fkafka%2FKafkaServicesConfiguration.java;h=f78beb1074026828fdc10f4998693d070bbfd813;hb=300fb309b42aefecd475a75c946958e9b9316f7e;hp=6b7c15662c33992f8a36b78dedcfea178a3adc4a;hpb=f62fc04de07b8088b90defebdb8fdf1319b9e66a;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java index 6b7c1566..f78beb10 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java @@ -124,15 +124,18 @@ public class KafkaServicesConfiguration InfoChannel infoChannel( ChatBackendProperties properties, Producer producer, - Consumer infoChannelConsumer) + Consumer infoChannelConsumer, + ChannelMediator channelMediator) { - return new InfoChannel( + InfoChannel infoChannel = new InfoChannel( properties.getKafka().getInfoChannelTopic(), producer, infoChannelConsumer, properties.getKafka().getPollingInterval(), properties.getKafka().getNumPartitions(), properties.getKafka().getInstanceUri()); + channelMediator.setInfoChannel(infoChannel); + return infoChannel; } @Bean @@ -142,7 +145,7 @@ public class KafkaServicesConfiguration Consumer dataChannelConsumer, ZoneId zoneId, Clock clock, - InfoChannel infoChannel, + ChannelMediator channelMediator, ShardingPublisherStrategy shardingPublisherStrategy) { return new DataChannel( @@ -155,10 +158,16 @@ public class KafkaServicesConfiguration properties.getKafka().getPollingInterval(), properties.getChatroomBufferSize(), clock, - infoChannel, + channelMediator, shardingPublisherStrategy); } + @Bean + ChannelMediator channelMediator() + { + return new ChannelMediator(); + } + @Bean Producer producer( Properties defaultProducerProperties,