import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
-import org.assertj.core.api.Assertions;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.*;
@Slf4j
-public class ApplicationTests extends GenericApplicationTests<String, String>
+public class ApplicationTests extends GenericApplicationTests<String, Message>
{
@Autowired
StateRepository stateRepository;
.mapToObj(i -> "seeräuber-" + i)
.toArray(i -> new String[i]);
final StringSerializer stringSerializer = new StringSerializer();
- final Bytes calculateMessage = new Bytes(stringSerializer.serialize(TOPIC, "CALCULATE"));
+ final Bytes calculateMessage = new Bytes(stringSerializer.serialize(TOPIC, "{}"));
- int counter = 0;
+ int counterMessages;
+ int counterPoisonPills;
+ int counterLogicErrors;
Map<String, List<AdderResult>> state;
@Override
- public int generate(
+ public void generate(
boolean poisonPills,
boolean logicErrors,
Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
{
- counter = 0;
+ counterMessages = 0;
+ counterPoisonPills = 0;
+ counterLogicErrors = 0;
+
state =
Arrays
.stream(dieWilden13)
if (message[i] > number[i])
{
- send(key, calculateMessage, fail(logicErrors, pass, counter), messageSender);
+ send(
+ key,
+ calculateMessage,
+ Message.Type.CALC,
+ poisonPill(poisonPills, pass, counterMessages),
+ logicError(logicErrors, pass, counterMessages),
+ messageSender);
state.get(seeräuber).add(new AdderResult(number[i], (number[i] + 1) * number[i] / 2));
// Pick next number to calculate
number[i] = numbers[next++%numbers.length];
log.debug("Seeräuber {} will die Summe für {} berechnen", seeräuber, number[i]);
}
- Bytes value = new Bytes(stringSerializer.serialize(TOPIC, Integer.toString(message[i]++)));
- send(key, value, fail(logicErrors, pass, counter), messageSender);
+ send(
+ key,
+ new Bytes(stringSerializer.serialize(TOPIC, "{\"next\":" + message[i]++ + "}")),
+ Message.Type.ADD,
+ poisonPill(poisonPills, pass, counterMessages),
+ logicError(logicErrors, pass, counterMessages),
+ messageSender);
}
}
+ }
- return counter;
+ @Override
+ public int getNumberOfMessages()
+ {
+ return counterMessages;
}
- boolean fail (boolean logicErrors, int pass, int counter)
+ @Override
+ public int getNumberOfPoisonPills()
+ {
+ return counterPoisonPills;
+ }
+
+ @Override
+ public int getNumberOfLogicErrors()
+ {
+ return counterLogicErrors;
+ }
+
+ boolean poisonPill (boolean poisonPills, int pass, int counter)
+ {
+ return poisonPills && pass > 300 && counter%99 == 0;
+ }
+
+ boolean logicError(boolean logicErrors, int pass, int counter)
{
return logicErrors && pass > 300 && counter%77 == 0;
}
void send(
Bytes key,
Bytes value,
- boolean fail,
+ Message.Type type,
+ boolean poisonPill,
+ boolean logicError,
Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
{
- counter++;
+ counterMessages++;
- if (fail)
+ if (logicError)
{
- value = new Bytes(stringSerializer.serialize(TOPIC, Integer.toString(-1)));
+ value = new Bytes(stringSerializer.serialize(TOPIC, "{\"next\":-1}"));
+ counterLogicErrors++;
+ }
+ if (poisonPill)
+ {
+ value = new Bytes("BOOM!".getBytes());
+ counterPoisonPills++;
}
- messageSender.accept(new ProducerRecord<>(TOPIC, key, value));
- }
-
- @Override
- public boolean canGeneratePoisonPill()
- {
- return false;
+ ProducerRecord<Bytes, Bytes> record = new ProducerRecord<>(TOPIC, key, value);
+ record.headers().add("__TypeId__", type.toString().getBytes());
+ messageSender.accept(record);
}
@Override
}
assertThat(resultsForUser.get(j))
- .as("Unexpected results calculation %i of user %s", j, user)
+ .as("Unexpected results calculation %d of user %s", j, user)
.isEqualTo(state.get(user).get(j));
}