private void sendValidMessage(int partition)
{
- kafkaTemplate.send(TOPIC, partition, "EGAL", serializer.serialize(TOPIC, (long)partition));
+ send(partition, partition);
}
private void sendNonDeserializableMessage(int partition)
{
- kafkaTemplate.send(TOPIC, partition, "EGAL", "BOOM!".getBytes());
+ send(partition, "BOOM!".getBytes());
}
private void sendMessageThatTriggersRuntimeExceptionInDomain(int partition)
{
- kafkaTemplate.send(
- TOPIC,
- partition,
- "EGAL",
- serializer.serialize(TOPIC, (long)VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION));
+ send(partition, VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION);
}
+ private void send(int partition, long message)
+ {
+ send(partition, serializer.serialize(TOPIC, message));
+ }
+
+ private void send(int partition, byte[] bytes)
+ {
+ kafkaTemplate.send(TOPIC, partition, "EGAL", bytes);
+ }
public final static int VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION = -1;