+
+ Bytes value = new Bytes(stringSerializer.serialize(TOPIC, Integer.toString(message[i]++)));
+ send(key, value, fail(logicErrors, pass, counter), messageSender);
+ }
+ }
+
+ return counter;
+ }
+
+ boolean fail (boolean logicErrors, int pass, int counter)
+ {
+ return logicErrors && pass > 300 && counter%77 == 0;
+ }
+
+ void send(
+ Bytes key,
+ Bytes value,
+ boolean fail,
+ Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
+ {
+ counter++;
+
+ if (fail)
+ {
+ value = new Bytes(stringSerializer.serialize(TOPIC, Integer.toString(-1)));
+ }
+
+ messageSender.accept(new ProducerRecord<>(TOPIC, key, value));
+ }
+
+ @Override
+ public boolean canGeneratePoisonPill()
+ {
+ return false;
+ }
+
+ @Override
+ public void assertBusinessLogic()
+ {
+ for (int i=0; i<PARTITIONS; i++)
+ {
+ StateDocument stateDocument =
+ tests.stateRepository.findById(Integer.toString(i)).get();
+
+ stateDocument
+ .results.entrySet().stream()
+ .forEach(entry ->
+ {
+ String user = entry.getKey();
+ List<AdderResult> resultsForUser = entry.getValue();
+
+ assertThat(state.get(user))
+ .as("Unexpected results for user %s", user)
+ .containsExactlyElementsOf(resultsForUser);
+ });
+ }
+ }