refactor: Inline-Klasse in `ApplicationTests` ist jetzt statische Klasse
[demos/kafka/training] / src / test / java / de / juplo / kafka / ApplicationTests.java
1 package de.juplo.kafka;
2
3 import org.apache.kafka.clients.producer.ProducerRecord;
4 import org.apache.kafka.common.serialization.StringSerializer;
5 import org.apache.kafka.common.utils.Bytes;
6
7 import java.util.function.Consumer;
8 import java.util.stream.IntStream;
9
10
11 public class ApplicationTests extends GenericApplicationTests<String, String>
12 {
13   public ApplicationTests()
14   {
15     super(new ApplicationTestRecrodGenerator());
16   }
17
18
19   static class ApplicationTestRecrodGenerator implements RecordGenerator
20   {
21     final int[] numbers = {1, 7, 3, 2, 33, 6, 11};
22     final String[] dieWilden13 =
23         IntStream
24             .range(1, 14)
25             .mapToObj(i -> "seeräuber-" + i)
26             .toArray(i -> new String[i]);
27     final StringSerializer stringSerializer = new StringSerializer();
28     final Bytes calculateMessage = new Bytes(stringSerializer.serialize(TOPIC, "CALCULATE"));
29
30     int counter = 0;
31
32     @Override
33     public int generate(
34         boolean poisonPills,
35         boolean logicErrors,
36         Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
37     {
38       counter = 0;
39
40       for (int i = 0; i < 33; i++)
41       {
42         String seeräuber = dieWilden13[i % 13];
43         int number = numbers[i % 7];
44
45         Bytes key = new Bytes(stringSerializer.serialize(TOPIC, seeräuber));
46
47         for (int message = 1; message <= number; message++)
48         {
49           Bytes value = new Bytes(stringSerializer.serialize(TOPIC, Integer.toString(message)));
50           send(key, value, logicErrors, messageSender);
51         }
52         send(key, calculateMessage, logicErrors, messageSender);
53       }
54
55       return counter;
56     }
57
58     void send(
59         Bytes key,
60         Bytes value,
61         boolean logicErrors,
62         Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
63     {
64       counter++;
65
66       if (counter == 77)
67       {
68         if (logicErrors)
69         {
70           value = new Bytes(stringSerializer.serialize(TOPIC, Integer.toString(-1)));
71         }
72       }
73
74       messageSender.accept(new ProducerRecord<>(TOPIC, key, value));
75     }
76
77     @Override
78     public boolean canGeneratePoisonPill()
79     {
80       return false;
81     }
82   }
83 }