fix: Fehlerhafte Erwartung korrigiert
[demos/kafka/training] / src / test / java / de / juplo / kafka / ApplicationTests.java
1 package de.juplo.kafka;
2
3 import org.apache.kafka.clients.producer.ProducerRecord;
4 import org.apache.kafka.common.serialization.StringSerializer;
5 import org.apache.kafka.common.utils.Bytes;
6
7 import java.util.function.Consumer;
8 import java.util.stream.IntStream;
9
10
11 public class ApplicationTests extends GenericApplicationTests<String, String>
12 {
13   public ApplicationTests()
14   {
15     super(
16         new RecordGenerator()
17         {
18           final int[] numbers = { 1, 7, 3, 2, 33, 6, 11 };
19           final String[] dieWilden13 =
20               IntStream
21                   .range(1,14)
22                   .mapToObj(i -> "seeräuber-" + i)
23                   .toArray(i -> new String[i]);
24           final StringSerializer stringSerializer = new StringSerializer();
25           final Bytes startMessage = new Bytes(stringSerializer.serialize(TOPIC, "START"));
26           final Bytes endMessage = new Bytes(stringSerializer.serialize(TOPIC, "END"));
27
28           int counter = 0;
29
30
31           @Override
32           public int generate(
33               boolean poisonPills,
34               boolean logicErrors,
35               Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
36           {
37             counter = 0;
38
39             for (int i = 0; i < 33; i++)
40             {
41               String seeräuber = dieWilden13[i%13];
42               int number = numbers[i%7];
43
44               Bytes key = new Bytes(stringSerializer.serialize(TOPIC, seeräuber));
45
46               send(key, startMessage, logicErrors, messageSender);
47               for (int message = 1; message <= number; message++)
48               {
49                 Bytes value = new Bytes(stringSerializer.serialize(TOPIC, Integer.toString(message)));
50                 send(key, value, logicErrors, messageSender);
51               }
52               send(key, endMessage, logicErrors, messageSender);
53             }
54
55             return counter;
56           }
57
58           void send(
59               Bytes key,
60               Bytes value,
61               boolean logicErrors,
62               Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
63           {
64             counter++;
65
66             if (counter == 77)
67             {
68               if (logicErrors)
69               {
70                 value = value.equals(startMessage) ? endMessage : startMessage;
71               }
72             }
73
74             messageSender.accept(new ProducerRecord<>(TOPIC, key, value));
75           }
76
77           @Override
78           public boolean canGeneratePoisonPill()
79           {
80             return false;
81           }
82         });
83   }
84 }