@Autowired
ApplicationProperties properties;
@Autowired
- ExecutorService executor;
+ EndlessConsumer endlessConsumer;
+ @Autowired
+ RecordHandler recordHandler;
- Consumer<ConsumerRecord<String, Long>> testHandler;
- EndlessConsumer<String, Long> endlessConsumer;
Map<TopicPartition, Long> oldOffsets;
Map<TopicPartition, Long> newOffsets;
+ Set<ConsumerRecord<String, Long>> receivedRecords;
/** Tests methods */
await("Consumer failed")
.atMost(Duration.ofSeconds(30))
- .until(() -> !endlessConsumer.running());
+ .untilAsserted(() -> checkSeenOffsetsForProgress());
- checkSeenOffsetsForProgress();
- compareToCommitedOffsets(newOffsets);
-
- endlessConsumer.start();
- await("Consumer failed")
- .atMost(Duration.ofSeconds(30))
- .until(() -> !endlessConsumer.running());
-
- checkSeenOffsetsForProgress();
compareToCommitedOffsets(newOffsets);
+ assertThat(receivedRecords.size())
+ .describedAs("Received not all sent events")
+ .isLessThan(100);
}
newOffsets.put(tp, offset - 1);
});
- Consumer<ConsumerRecord<String, Long>> captureOffsetAndExecuteTestHandler =
+ recordHandler.captureOffsets =
record ->
+ {
++ receivedRecords.add(record);
newOffsets.put(
new TopicPartition(record.topic(), record.partition()),
record.offset());
- receivedRecords.add(record);
- testHandler.accept(record);
+ };
- endlessConsumer =
- new EndlessConsumer<>(
- executor,
- properties.getClientId(),
- properties.getTopic(),
- kafkaConsumer,
- captureOffsetAndExecuteTestHandler);
-
endlessConsumer.start();
}