@Autowired
PartitionStatisticsRepository repository;
@Autowired
- WordcountRebalanceListener wordcountRebalanceListener;
+ SumRebalanceListener sumRebalanceListener;
@Autowired
- WordcountRecordHandler wordcountRecordHandler;
+ SumRecordHandler sumRecordHandler;
EndlessConsumer<String, String> endlessConsumer;
Map<TopicPartition, Long> oldOffsets;
});
TestRecordHandler<String, String> captureOffsetAndExecuteTestHandler =
- new TestRecordHandler<String, String>(wordcountRecordHandler) {
+ new TestRecordHandler<String, String>(sumRecordHandler) {
@Override
public void onNewRecord(ConsumerRecord<String, String> record)
{
properties.getClientId(),
properties.getTopic(),
kafkaConsumer,
- wordcountRebalanceListener,
+ sumRebalanceListener,
captureOffsetAndExecuteTestHandler);
endlessConsumer.start();