import com.mongodb.client.MongoClient;
import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
@Autowired
KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
@Autowired
- TestRecordHandler<K, V> recordHandler;
+ TestRecordHandler recordHandler;
@Autowired
- EndlessConsumer<K, V> endlessConsumer;
+ EndlessConsumer endlessConsumer;
KafkaProducer<Bytes, Bytes> testRecordProducer;
KafkaConsumer<Bytes, Bytes> offsetConsumer;
await(numberOfGeneratedMessages + " records received")
.atMost(Duration.ofSeconds(30))
.pollInterval(Duration.ofSeconds(1))
- .until(() -> recordHandler.receivedRecords.size() >= numberOfGeneratedMessages);
+ .until(() -> recordHandler.receivedMessages >= numberOfGeneratedMessages);
await("Offsets committed")
.atMost(Duration.ofSeconds(10))
checkSeenOffsetsForProgress();
assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets);
- assertThat(recordHandler.receivedRecords.size())
+ assertThat(recordHandler.receivedMessages)
.describedAs("Received not all sent events")
.isLessThan(numberOfGeneratedMessages);
@Test
@SkipWhenErrorCannotBeGenerated(logicError = true)
- void doesNotCommitOffsetsOnLogicError()
+ void commitsOffsetsOfUnseenRecordsOnLogicError()
{
int numberOfGeneratedMessages =
recordGenerator.generate(false, true, messageSender);
.until(() -> !endlessConsumer.running());
checkSeenOffsetsForProgress();
- assertSeenOffsetsAreBehindCommittedOffsets(recordHandler.seenOffsets);
+ assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets);
endlessConsumer.start();
await("Consumer failed")
.pollInterval(Duration.ofSeconds(1))
.until(() -> !endlessConsumer.running());
- assertSeenOffsetsAreBehindCommittedOffsets(recordHandler.seenOffsets);
+ assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets);
assertThatNoException()
.describedAs("Consumer should not be running")
oldOffsets = new HashMap<>();
recordHandler.seenOffsets = new HashMap<>();
- recordHandler.receivedRecords = new HashSet<>();
+ recordHandler.receivedMessages = 0;
doForCurrentOffsets((tp, offset) ->
{