1 package de.juplo.kafka;
3 import org.apache.kafka.clients.producer.ProducerRecord;
4 import org.apache.kafka.common.serialization.StringSerializer;
5 import org.apache.kafka.common.utils.Bytes;
7 import java.util.function.Consumer;
8 import java.util.stream.IntStream;
11 public class ApplicationTests extends GenericApplicationTests<String, String>
13 public ApplicationTests()
18 final int[] numbers = { 1, 7, 3, 2, 33, 6, 11 };
19 final String[] dieWilden13 =
22 .mapToObj(i -> "seeräuber-" + i)
23 .toArray(i -> new String[i]);
24 final StringSerializer stringSerializer = new StringSerializer();
25 final Bytes calculateMessage = new Bytes(stringSerializer.serialize(TOPIC, "CALCULATE"));
34 Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
38 for (int i = 0; i < 33; i++)
40 String seeräuber = dieWilden13[i%13];
41 int number = numbers[i%7];
43 Bytes key = new Bytes(stringSerializer.serialize(TOPIC, seeräuber));
45 for (int message = 1; message <= number; message++)
47 Bytes value = new Bytes(stringSerializer.serialize(TOPIC, Integer.toString(message)));
48 send(key, value, logicErrors, messageSender);
50 send(key, calculateMessage, logicErrors, messageSender);
60 Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
68 value = new Bytes(stringSerializer.serialize(TOPIC, Integer.toString(-1)));
72 messageSender.accept(new ProducerRecord<>(TOPIC, key, value));
76 public boolean canGeneratePoisonPill()