public ApplicationTest()
{
super(
- new StringSerializer(),
- new RecordGenerator<> ()
+ new RecordGenerator()
{
final StringSerializer stringSerializer = new StringSerializer();
final LongSerializer longSerializer = new LongSerializer();
public void generate(
int numberOfMessagesToGenerate,
Set<Integer> poisonPills,
- Consumer<ProducerRecord<String, Bytes>> messageSender)
+ Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
{
int i = 0;
? new Bytes(stringSerializer.serialize(TOPIC, "BOOM!"))
: new Bytes(longSerializer.serialize(TOPIC, (long)i));
- ProducerRecord<String, Bytes> record =
+ ProducerRecord<Bytes, Bytes> record =
new ProducerRecord<>(
TOPIC,
partition,
- Integer.toString(partition*10+key%2),
+ new Bytes(stringSerializer.serialize(TOPIC,Integer.toString(partition*10+key%2))),
value);
messageSender.accept(record);