infoChannelConsumer,
properties.getKafka().getPollingInterval(),
properties.getKafka().getNumPartitions(),
- properties.getKafka().getInstanceUri());
+ properties.getKafka().getInstanceUri(),
+ channelMediator);
channelMediator.setInfoChannel(infoChannel);
return infoChannel;
}
ChannelMediator channelMediator,
ShardingPublisherStrategy shardingPublisherStrategy)
{
- return new DataChannel(
+ DataChannel dataChannel = new DataChannel(
properties.getInstanceId(),
properties.getKafka().getDataChannelTopic(),
producer,
clock,
channelMediator,
shardingPublisherStrategy);
+ channelMediator.setDataChannel(dataChannel);
+ return dataChannel;
}
@Bean