1 package de.juplo.kafka;
3 import org.apache.kafka.clients.producer.ProducerRecord;
4 import org.apache.kafka.common.serialization.LongSerializer;
5 import org.apache.kafka.common.serialization.StringSerializer;
6 import org.apache.kafka.common.utils.Bytes;
9 import java.util.function.Consumer;
12 public class ApplicationTest extends GenericApplicationTest<String, Long>
14 public ApplicationTest()
17 new StringSerializer(),
18 new RecordGenerator<> ()
20 final StringSerializer stringSerializer = new StringSerializer();
21 final LongSerializer longSerializer = new LongSerializer();
26 int numberOfMessagesToGenerate,
27 Set<Integer> poisonPills,
28 Consumer<ProducerRecord<String, Bytes>> messageSender)
32 for (int partition = 0; partition < 10; partition++)
34 for (int key = 0; key < 10; key++)
36 if (++i > numberOfMessagesToGenerate)
40 poisonPills.contains(i)
41 ? new Bytes(stringSerializer.serialize(TOPIC, "BOOM!"))
42 : new Bytes(longSerializer.serialize(TOPIC, (long)i));
44 ProducerRecord<String, Bytes> record =
48 Integer.toString(partition*10+key%2),
51 messageSender.accept(record);