PartitionStatisticsRepository repository;
Consumer<ConsumerRecord<String, String>> testHandler;
- EndlessConsumer<String, String> endlessConsumer;
+ EndlessConsumer endlessConsumer;
Map<TopicPartition, Long> oldOffsets;
Map<TopicPartition, Long> newOffsets;
Set<ConsumerRecord<String, String>> receivedRecords;
};
endlessConsumer =
- new EndlessConsumer<>(
+ new EndlessConsumer(
executor,
repository,
properties.getClientId(),
properties.getTopic(),
Clock.systemDefaultZone(),
properties.getCommitInterval(),
- kafkaConsumer,
- captureOffsetAndExecuteTestHandler);
+ kafkaConsumer);
endlessConsumer.start();
}