PartitionStatisticsRepository repository;
Consumer<ConsumerRecord<String, String>> testHandler;
- EndlessConsumer<String, String> endlessConsumer;
+ Wordcount<String, String> wordcount;
Map<TopicPartition, Long> oldOffsets;
Map<TopicPartition, Long> newOffsets;
Set<ConsumerRecord<String, String>> receivedRecords;
});
assertThatExceptionOfType(IllegalStateException.class)
- .isThrownBy(() -> endlessConsumer.exitStatus())
+ .isThrownBy(() -> wordcount.exitStatus())
.describedAs("Consumer should still be running");
}
testHandler.accept(record);
};
- endlessConsumer =
+ wordcount =
new EndlessConsumer<>(
executor,
repository,
kafkaConsumer,
captureOffsetAndExecuteTestHandler);
- endlessConsumer.start();
+ wordcount.start();
}
@AfterEach
{
try
{
- endlessConsumer.stop();
+ wordcount.stop();
}
catch (Exception e)
{