@Override
- public void generate(
- Set<Integer> poisonPills,
- Set<Integer> logicErrors,
+ public int generate(
+ boolean poisonPills,
+ boolean logicErrors,
Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
{
int i = 0;
{
for (int key = 0; key < 10; key++)
{
- if (++i > 100)
- return;
+ i++;
Bytes value = new Bytes(longSerializer.serialize(TOPIC, (long)i));
- if (logicErrors.contains(i))
+ if (i == 77)
{
- value = new Bytes(longSerializer.serialize(TOPIC, Long.MIN_VALUE));
- }
- if (poisonPills.contains(i))
- {
- value = new Bytes(stringSerializer.serialize(TOPIC, "BOOM (Poison-Pill)!"));
+ if (logicErrors)
+ {
+ value = new Bytes(longSerializer.serialize(TOPIC, Long.MIN_VALUE));
+ }
+ if (poisonPills)
+ {
+ value = new Bytes(stringSerializer.serialize(TOPIC, "BOOM (Poison-Pill)!"));
+ }
}
ProducerRecord<Bytes, Bytes> record =
messageSender.accept(record);
}
}
+
+ return i;
}
});
}
@Test
void commitsCurrentOffsetsOnSuccess()
{
- recordGenerator.generate(100, Set.of(), Set.of(), messageSender);
+ int numberOfGeneratedMessages =
+ recordGenerator.generate(false, false, messageSender);
- await("100 records received")
+ await(numberOfGeneratedMessages + " records received")
.atMost(Duration.ofSeconds(30))
.pollInterval(Duration.ofSeconds(1))
- .until(() -> receivedRecords.size() >= 100);
+ .until(() -> receivedRecords.size() >= numberOfGeneratedMessages);
await("Offsets committed")
.atMost(Duration.ofSeconds(10))
@SkipWhenErrorCannotBeGenerated(poisonPill = true)
void commitsOffsetOfErrorForReprocessingOnDeserializationError()
{
- recordGenerator.generate(100, Set.of(77), Set.of(), messageSender);
+ int numberOfGeneratedMessages =
+ recordGenerator.generate(true, false, messageSender);
await("Consumer failed")
.atMost(Duration.ofSeconds(30))
compareToCommitedOffsets(newOffsets);
assertThat(receivedRecords.size())
.describedAs("Received not all sent events")
- .isLessThan(100);
+ .isLessThan(numberOfGeneratedMessages);
assertThatNoException()
.describedAs("Consumer should not be running")
@SkipWhenErrorCannotBeGenerated(logicError = true)
void doesNotCommitOffsetsOnLogicError()
{
- recordGenerator.generate(100, Set.of(), Set.of(77), messageSender);
+ int numberOfGeneratedMessages =
+ recordGenerator.generate(false, true, messageSender);
await("Consumer failed")
.atMost(Duration.ofSeconds(30))
compareToCommitedOffsets(oldOffsets);
assertThat(receivedRecords.size())
.describedAs("Received not all sent events")
- .isLessThan(100);
+ .isLessThan(numberOfGeneratedMessages);
assertThatNoException()
.describedAs("Consumer should not be running")
public interface RecordGenerator
{
- void generate(
- Set<Integer> poisonPills,
- Set<Integer> logicErrors,
+ int generate(
+ boolean poisonPills,
+ boolean logicErrors,
Consumer<ProducerRecord<Bytes, Bytes>> messageSender);
- default boolean canGeneratePoisionPill()
+ default boolean canGeneratePoisonPill()
{
return true;
}