X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fimplementation%2Fkafka%2FKafkaServicesConfiguration.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fimplementation%2Fkafka%2FKafkaServicesConfiguration.java;h=33371279b8c9de92b2ffa06615d128eed74491a5;hb=0f13dc5e88722ca7c238258747041d9663251356;hp=f78beb1074026828fdc10f4998693d070bbfd813;hpb=300fb309b42aefecd475a75c946958e9b9316f7e;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java index f78beb10..33371279 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java @@ -133,7 +133,8 @@ public class KafkaServicesConfiguration infoChannelConsumer, properties.getKafka().getPollingInterval(), properties.getKafka().getNumPartitions(), - properties.getKafka().getInstanceUri()); + properties.getKafka().getInstanceUri(), + channelMediator); channelMediator.setInfoChannel(infoChannel); return infoChannel; } @@ -148,7 +149,7 @@ public class KafkaServicesConfiguration ChannelMediator channelMediator, ShardingPublisherStrategy shardingPublisherStrategy) { - return new DataChannel( + DataChannel dataChannel = new DataChannel( properties.getInstanceId(), properties.getKafka().getDataChannelTopic(), producer, @@ -160,6 +161,8 @@ public class KafkaServicesConfiguration clock, channelMediator, shardingPublisherStrategy); + channelMediator.setDataChannel(dataChannel); + return dataChannel; } @Bean