final StringSerializer stringSerializer = new StringSerializer();
final LongSerializer longSerializer = new LongSerializer();
+ int numberOfMessages;
+ int numberOfLogicErrors;
+ int numberOfPoisonPills;
+
@Override
- public int generate(
+ public void generate(
boolean poisonPills,
boolean logicErrors,
Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
{
- int i = 0;
-
for (int partition = 0; partition < 10; partition++)
{
for (int key = 0; key < 10000; key++)
{
- i++;
+ numberOfMessages++;
- Bytes value = new Bytes(longSerializer.serialize(TOPIC, (long)i));
- if (i == 99977)
+ Bytes value = new Bytes(longSerializer.serialize(TOPIC, (long) numberOfMessages));
+ if (numberOfMessages == 99977)
{
if (logicErrors)
{
value = new Bytes(longSerializer.serialize(TOPIC, Long.MIN_VALUE));
+ numberOfLogicErrors++;
}
if (poisonPills)
{
value = new Bytes(stringSerializer.serialize(TOPIC, "BOOM (Poison-Pill)!"));
+ numberOfPoisonPills++;
}
}
ProducerRecord<Bytes, Bytes> record =
- new ProducerRecord<>(
- TOPIC,
- partition,
- new Bytes(stringSerializer.serialize(TOPIC,Integer.toString(partition*10+key%2))),
- value);
+ new ProducerRecord<>(
+ TOPIC,
+ partition,
+ new Bytes(stringSerializer.serialize(TOPIC, Integer.toString(partition * 10 + key % 2))),
+ value);
messageSender.accept(record);
}
}
+ }
+
+ @Override
+ public int getNumberOfMessages()
+ {
+ return numberOfMessages;
+ }
- return i;
+ @Override
+ public int getNumberOfPoisonPills()
+ {
+ return numberOfPoisonPills;
+ }
+
+ @Override
+ public int getNumberOfLogicErrors()
+ {
+ return numberOfLogicErrors;
}
});
}
@Test
void commitsCurrentOffsetsOnSuccess() throws Exception
{
- int numberOfGeneratedMessages =
- recordGenerator.generate(false, false, messageSender);
+ recordGenerator.generate(false, false, messageSender);
+
+ int numberOfGeneratedMessages = recordGenerator.getNumberOfMessages();
await(numberOfGeneratedMessages + " records received")
.atMost(Duration.ofSeconds(30))
@SkipWhenErrorCannotBeGenerated(poisonPill = true)
void commitsOffsetOfErrorForReprocessingOnDeserializationError()
{
- int numberOfGeneratedMessages =
- recordGenerator.generate(true, false, messageSender);
+ recordGenerator.generate(true, false, messageSender);
+
+ int numberOfGeneratedMessages = recordGenerator.getNumberOfMessages();
await("Consumer failed")
.atMost(Duration.ofSeconds(30))
@SkipWhenErrorCannotBeGenerated(logicError = true)
void doesNotCommitOffsetsOnLogicError()
{
- int numberOfGeneratedMessages =
- recordGenerator.generate(false, true, messageSender);
+ recordGenerator.generate(false, true, messageSender);
+
+ int numberOfGeneratedMessages = recordGenerator.getNumberOfMessages();
await("Consumer failed")
.atMost(Duration.ofSeconds(30))
public interface RecordGenerator
{
- int generate(
+ void generate(
boolean poisonPills,
boolean logicErrors,
Consumer<ProducerRecord<Bytes, Bytes>> messageSender);
+ int getNumberOfMessages();
+ int getNumberOfPoisonPills();
+ int getNumberOfLogicErrors();
+
default boolean canGeneratePoisonPill()
{
return true;