1 package de.juplo.kafka;
3 import org.apache.kafka.clients.producer.ProducerRecord;
4 import org.apache.kafka.common.serialization.LongSerializer;
5 import org.apache.kafka.common.serialization.StringSerializer;
6 import org.apache.kafka.common.utils.Bytes;
9 import java.util.function.Consumer;
12 public class ApplicationTest extends GenericApplicationTest<String, Long>
14 public ApplicationTest()
19 final StringSerializer stringSerializer = new StringSerializer();
20 final LongSerializer longSerializer = new LongSerializer();
25 int numberOfMessagesToGenerate,
26 Set<Integer> poisonPills,
27 Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
31 for (int partition = 0; partition < 10; partition++)
33 for (int key = 0; key < 10; key++)
35 if (++i > numberOfMessagesToGenerate)
39 poisonPills.contains(i)
40 ? new Bytes(stringSerializer.serialize(TOPIC, "BOOM!"))
41 : new Bytes(longSerializer.serialize(TOPIC, (long)i));
43 ProducerRecord<Bytes, Bytes> record =
47 new Bytes(stringSerializer.serialize(TOPIC,Integer.toString(partition*10+key%2))),
50 messageSender.accept(record);