ROT: Signatur für `AdderBusinessLogic` und neue Erwartungen formuliert
[demos/kafka/training] / src / test / java / de / juplo / kafka / ApplicationTests.java
1 package de.juplo.kafka;
2
3 import org.apache.kafka.clients.producer.ProducerRecord;
4 import org.apache.kafka.common.serialization.StringSerializer;
5 import org.apache.kafka.common.utils.Bytes;
6
7 import java.util.function.Consumer;
8 import java.util.stream.IntStream;
9
10
11 public class ApplicationTests extends GenericApplicationTests<String, String>
12 {
13   public ApplicationTests()
14   {
15     super(
16         new RecordGenerator()
17         {
18           final int[] numbers = { 1, 7, 3, 2, 33, 6, 11 };
19           final String[] dieWilden13 =
20               IntStream
21                   .range(1,14)
22                   .mapToObj(i -> "seeräuber-" + i)
23                   .toArray(i -> new String[i]);
24           final StringSerializer stringSerializer = new StringSerializer();
25           final Bytes calculateMessage = new Bytes(stringSerializer.serialize(TOPIC, "CALCULATE"));
26
27           int counter = 0;
28
29
30           @Override
31           public int generate(
32               boolean poisonPills,
33               boolean logicErrors,
34               Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
35           {
36             counter = 0;
37
38             for (int i = 0; i < 33; i++)
39             {
40               String seeräuber = dieWilden13[i%13];
41               int number = numbers[i%7];
42
43               Bytes key = new Bytes(stringSerializer.serialize(TOPIC, seeräuber));
44
45               for (int message = 1; message <= number; message++)
46               {
47                 Bytes value = new Bytes(stringSerializer.serialize(TOPIC, Integer.toString(message)));
48                 send(key, value, logicErrors, messageSender);
49               }
50               send(key, calculateMessage, logicErrors, messageSender);
51             }
52
53             return counter;
54           }
55
56           void send(
57               Bytes key,
58               Bytes value,
59               boolean logicErrors,
60               Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
61           {
62             counter++;
63
64             if (counter == 77)
65             {
66               if (logicErrors)
67               {
68                 value = new Bytes(stringSerializer.serialize(TOPIC, Integer.toString(-1)));
69               }
70             }
71
72             messageSender.accept(new ProducerRecord<>(TOPIC, key, value));
73           }
74
75           @Override
76           public boolean canGeneratePoisonPill()
77           {
78             return false;
79           }
80         });
81   }
82 }