+ @Override
+ public int generate(
+ boolean poisonPills,
+ boolean logicErrors,
+ Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
+ {
+ counter = 0;
+ state =
+ Arrays
+ .stream(dieWilden13)
+ .collect(Collectors.toMap(
+ seeräuber -> seeräuber,
+ seeräuber -> new LinkedList()));
+
+ int number[] = { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 };
+ int message[] = { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 };
+ int next = 0;
+
+ for (int pass = 0; pass < 333; pass++)
+ {
+ for (int i = 0; i<13; i++)
+ {
+ String seeräuber = dieWilden13[i];
+ Bytes key = new Bytes(stringSerializer.serialize(TOPIC, seeräuber));
+
+ if (message[i] > number[i])
+ {
+ send(key, calculateMessage, fail(logicErrors, pass, counter), messageSender);
+ state.get(seeräuber).add(new AdderResult(number[i], (number[i] + 1) * number[i] / 2));
+ // Pick next number to calculate
+ number[i] = numbers[next++%numbers.length];
+ message[i] = 1;
+ log.debug("Seeräuber {} will die Summe für {} berechnen", seeräuber, number[i]);