1 package de.juplo.kafka;
3 import org.apache.kafka.clients.producer.ProducerRecord;
4 import org.apache.kafka.common.serialization.StringSerializer;
5 import org.apache.kafka.common.utils.Bytes;
7 import java.util.function.Consumer;
8 import java.util.stream.IntStream;
11 public class ApplicationTests extends GenericApplicationTests<String, String>
13 public ApplicationTests()
18 final int[] numbers = { 1, 7, 3, 2, 33, 6, 11 };
19 final String[] dieWilden13 =
22 .mapToObj(i -> "seeräuber-" + i)
23 .toArray(i -> new String[i]);
24 final StringSerializer stringSerializer = new StringSerializer();
25 final Bytes startMessage = new Bytes(stringSerializer.serialize(TOPIC, "START"));
26 final Bytes endMessage = new Bytes(stringSerializer.serialize(TOPIC, "END"));
35 Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
39 for (int i = 0; i < 33; i++)
41 String seeräuber = dieWilden13[i%13];
42 int number = numbers[i%7];
44 Bytes key = new Bytes(stringSerializer.serialize(TOPIC, seeräuber));
46 send(key, startMessage, logicErrors, messageSender);
47 for (int message = 1; message <= number; message++)
49 Bytes value = new Bytes(stringSerializer.serialize(TOPIC, Integer.toString(message)));
50 send(key, value, logicErrors, messageSender);
52 send(key, endMessage, logicErrors, messageSender);
62 Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
70 value = value.equals(startMessage) ? endMessage : startMessage;
74 messageSender.accept(new ProducerRecord<>(TOPIC, key, value));
78 public boolean canGeneratePoisonPill()