Service ergänzt, der das Dead-Letter-Topic ausliest
[demos/kafka/training] / src / test / java / de / juplo / kafka / ApplicationTests.java
1 package de.juplo.kafka;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.producer.ProducerRecord;
5 import org.apache.kafka.common.serialization.StringSerializer;
6 import org.apache.kafka.common.utils.Bytes;
7 import org.springframework.beans.factory.annotation.Autowired;
8
9 import java.util.*;
10 import java.util.function.Consumer;
11 import java.util.stream.Collectors;
12 import java.util.stream.IntStream;
13
14 import static org.assertj.core.api.Assertions.assertThat;
15
16
17 @Slf4j
18 public class ApplicationTests extends GenericApplicationTests<String, Message>
19 {
20   @Autowired
21   StateRepository stateRepository;
22
23
24   public ApplicationTests()
25   {
26     super(new ApplicationTestRecrodGenerator());
27     ((ApplicationTestRecrodGenerator)recordGenerator).tests = this;
28   }
29
30
31   static class ApplicationTestRecrodGenerator implements RecordGenerator
32   {
33     ApplicationTests tests;
34
35     final int[] numbers = {1, 77, 33, 2, 66, 666, 11};
36     final String[] dieWilden13 =
37         IntStream
38             .range(1, 14)
39             .mapToObj(i -> "seeräuber-" + i)
40             .toArray(i -> new String[i]);
41     final StringSerializer stringSerializer = new StringSerializer();
42     final Bytes calculateMessage = new Bytes(stringSerializer.serialize(TOPIC, "{}"));
43
44     int counterMessages;
45     int counterPoisonPills;
46     int counterLogicErrors;
47
48     Map<String, List<AdderResult>> state;
49
50     @Override
51     public void generate(
52         boolean poisonPills,
53         boolean logicErrors,
54         Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
55     {
56       counterMessages = 0;
57       counterPoisonPills = 0;
58       counterLogicErrors = 0;
59
60       state =
61           Arrays
62               .stream(dieWilden13)
63               .collect(Collectors.toMap(
64                   seeräuber -> seeräuber,
65                   seeräuber -> new LinkedList()));
66
67       int number[] = { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 };
68       int message[] = { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 };
69       int next = 0;
70
71       for (int pass = 0; pass < 333; pass++)
72       {
73         for (int i = 0; i<13; i++)
74         {
75           String seeräuber = dieWilden13[i];
76           Bytes key = new Bytes(stringSerializer.serialize(TOPIC, seeräuber));
77
78           if (message[i] > number[i])
79           {
80             send(
81               key,
82               calculateMessage,
83               Message.Type.CALC,
84               poisonPill(poisonPills, pass, counterMessages),
85               logicError(logicErrors, pass, counterMessages),
86               messageSender);
87             state.get(seeräuber).add(new AdderResult(number[i], (number[i] + 1) * number[i] / 2));
88             // Pick next number to calculate
89             number[i] = numbers[next++%numbers.length];
90             message[i] = 1;
91             log.debug("Seeräuber {} will die Summe für {} berechnen", seeräuber, number[i]);
92           }
93
94           send(
95             key,
96             new Bytes(stringSerializer.serialize(TOPIC, "{\"next\":" + message[i]++ + "}")),
97             Message.Type.ADD,
98             poisonPill(poisonPills, pass, counterMessages),
99             logicError(logicErrors, pass, counterMessages),
100             messageSender);
101         }
102       }
103     }
104
105     @Override
106     public int getNumberOfMessages()
107     {
108       return counterMessages;
109     }
110
111     @Override
112     public int getNumberOfPoisonPills()
113     {
114       return counterPoisonPills;
115     }
116
117     @Override
118     public int getNumberOfLogicErrors()
119     {
120       return counterLogicErrors;
121     }
122
123     boolean poisonPill (boolean poisonPills, int pass, int counter)
124     {
125       return poisonPills && pass > 300 && counter%99 == 0;
126     }
127
128     boolean logicError(boolean logicErrors, int pass, int counter)
129     {
130       return logicErrors && pass > 300 && counter%77 == 0;
131     }
132
133     void send(
134         Bytes key,
135         Bytes value,
136         Message.Type type,
137         boolean poisonPill,
138         boolean logicError,
139         Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
140     {
141       counterMessages++;
142
143       if (logicError)
144       {
145         value = new Bytes(stringSerializer.serialize(TOPIC, "{\"next\":-1}"));
146         counterLogicErrors++;
147       }
148       if (poisonPill)
149       {
150         value = new Bytes("BOOM!".getBytes());
151         counterPoisonPills++;
152       }
153
154       ProducerRecord<Bytes, Bytes> record = new ProducerRecord<>(TOPIC, key, value);
155       record.headers().add("__TypeId__", type.toString().getBytes());
156       messageSender.accept(record);
157     }
158
159     @Override
160     public void assertBusinessLogic()
161     {
162       for (int i=0; i<PARTITIONS; i++)
163       {
164         StateDocument stateDocument =
165             tests.stateRepository.findById(Integer.toString(i)).get();
166
167         stateDocument
168             .results
169             .entrySet()
170             .stream()
171             .forEach(entry ->
172             {
173               String user = entry.getKey();
174               List<AdderResult> resultsForUser = entry.getValue();
175
176               for (int j=0; j < resultsForUser.size(); j++)
177               {
178                 if (!(j < state.get(user).size()))
179                 {
180                   break;
181                 }
182
183                 assertThat(resultsForUser.get(j))
184                     .as("Unexpected results calculation %d of user %s", j, user)
185                     .isEqualTo(state.get(user).get(j));
186               }
187
188               assertThat(state.get(user))
189                   .as("More results calculated for user %s as expected", user)
190                   .containsAll(resultsForUser);
191             });
192       }
193     }
194   }
195 }