final StringSerializer stringSerializer = new StringSerializer();
final Bytes calculateMessage = new Bytes(stringSerializer.serialize(TOPIC, "{}"));
- int counter = 0;
+ int counterMessages;
+ int counterPoisonPills;
+ int counterLogicErrors;
Map<String, List<AdderResult>> state;
@Override
- public int generate(
+ public void generate(
boolean poisonPills,
boolean logicErrors,
Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
{
- counter = 0;
+ counterMessages = 0;
+ counterPoisonPills = 0;
+ counterLogicErrors = 0;
+
state =
Arrays
.stream(dieWilden13)
key,
calculateMessage,
Message.Type.CALC,
- poisonPill(poisonPills, pass, counter),
- logicError(logicErrors, pass, counter),
+ poisonPill(poisonPills, pass, counterMessages),
+ logicError(logicErrors, pass, counterMessages),
messageSender);
state.get(seeräuber).add(new AdderResult(number[i], (number[i] + 1) * number[i] / 2));
// Pick next number to calculate
key,
new Bytes(stringSerializer.serialize(TOPIC, "{\"next\":" + message[i]++ + "}")),
Message.Type.ADD,
- poisonPill(poisonPills, pass, counter),
- logicError(logicErrors, pass, counter),
+ poisonPill(poisonPills, pass, counterMessages),
+ logicError(logicErrors, pass, counterMessages),
messageSender);
}
}
+ }
+
+ @Override
+ public int getNumberOfMessages()
+ {
+ return counterMessages;
+ }
- return counter;
+ @Override
+ public int getNumberOfPoisonPills()
+ {
+ return counterPoisonPills;
+ }
+
+ @Override
+ public int getNumberOfLogicErrors()
+ {
+ return counterLogicErrors;
}
boolean poisonPill (boolean poisonPills, int pass, int counter)
boolean logicError,
Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
{
- counter++;
+ counterMessages++;
if (logicError)
{
value = new Bytes(stringSerializer.serialize(TOPIC, "{\"next\":-1}"));
+ counterLogicErrors++;
}
if (poisonPill)
{
value = new Bytes("BOOM!".getBytes());
+ counterPoisonPills++;
}
ProducerRecord<Bytes, Bytes> record = new ProducerRecord<>(TOPIC, key, value);
@Test
void commitsCurrentOffsetsOnSuccess() throws Exception
{
- int numberOfGeneratedMessages =
- recordGenerator.generate(false, false, messageSender);
+ recordGenerator.generate(false, false, messageSender);
+
+ int numberOfGeneratedMessages = recordGenerator.getNumberOfMessages();
await(numberOfGeneratedMessages + " records received")
.atMost(Duration.ofSeconds(30))
@SkipWhenErrorCannotBeGenerated(poisonPill = true)
void commitsOffsetOfErrorForReprocessingOnDeserializationError()
{
- int numberOfGeneratedMessages =
- recordGenerator.generate(true, false, messageSender);
+ recordGenerator.generate(true, false, messageSender);
+
+ int numberOfGeneratedMessages = recordGenerator.getNumberOfMessages();
await("Consumer failed")
.atMost(Duration.ofSeconds(30))
@SkipWhenErrorCannotBeGenerated(logicError = true)
void commitsOffsetsOfUnseenRecordsOnLogicError()
{
- int numberOfGeneratedMessages =
- recordGenerator.generate(false, true, messageSender);
+ recordGenerator.generate(false, true, messageSender);
+
+ int numberOfGeneratedMessages = recordGenerator.getNumberOfMessages();
await("Consumer failed")
.atMost(Duration.ofSeconds(30))
public interface RecordGenerator
{
- int generate(
+ void generate(
boolean poisonPills,
boolean logicErrors,
Consumer<ProducerRecord<Bytes, Bytes>> messageSender);
+ int getNumberOfMessages();
+ int getNumberOfPoisonPills();
+ int getNumberOfLogicErrors();
+
default boolean canGeneratePoisonPill()
{
return true;