Der Test verwendet die `@Bean` von `EndlessConsumer`
[demos/kafka/training] / src / test / java / de / juplo / kafka / ApplicationTests.java
1 package de.juplo.kafka;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.producer.ProducerRecord;
5 import org.apache.kafka.common.serialization.StringSerializer;
6 import org.apache.kafka.common.utils.Bytes;
7 import org.springframework.beans.factory.annotation.Autowired;
8
9 import java.util.*;
10 import java.util.function.Consumer;
11 import java.util.stream.Collectors;
12 import java.util.stream.IntStream;
13
14 import static org.assertj.core.api.Assertions.assertThat;
15
16
17 @Slf4j
18 public class ApplicationTests extends GenericApplicationTests<String, Message>
19 {
20   @Autowired
21   StateRepository stateRepository;
22
23
24   public ApplicationTests()
25   {
26     super(new ApplicationTestRecrodGenerator());
27     ((ApplicationTestRecrodGenerator)recordGenerator).tests = this;
28   }
29
30
31   static class ApplicationTestRecrodGenerator implements RecordGenerator
32   {
33     ApplicationTests tests;
34
35     final int[] numbers = {1, 77, 33, 2, 66, 666, 11};
36     final String[] dieWilden13 =
37         IntStream
38             .range(1, 14)
39             .mapToObj(i -> "seeräuber-" + i)
40             .toArray(i -> new String[i]);
41     final StringSerializer stringSerializer = new StringSerializer();
42     final Bytes calculateMessage = new Bytes(stringSerializer.serialize(TOPIC, "{}"));
43
44     int counter = 0;
45
46     Map<String, List<AdderResult>> state;
47
48     @Override
49     public int generate(
50         boolean poisonPills,
51         boolean logicErrors,
52         Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
53     {
54       counter = 0;
55       state =
56           Arrays
57               .stream(dieWilden13)
58               .collect(Collectors.toMap(
59                   seeräuber -> seeräuber,
60                   seeräuber -> new LinkedList()));
61
62       int number[] = { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 };
63       int message[] = { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 };
64       int next = 0;
65
66       for (int pass = 0; pass < 333; pass++)
67       {
68         for (int i = 0; i<13; i++)
69         {
70           String seeräuber = dieWilden13[i];
71           Bytes key = new Bytes(stringSerializer.serialize(TOPIC, seeräuber));
72
73           if (message[i] > number[i])
74           {
75             send(
76               key,
77               calculateMessage,
78               Message.Type.CALC,
79               poisonPill(poisonPills, pass, counter),
80               logicError(logicErrors, pass, counter),
81               messageSender);
82             state.get(seeräuber).add(new AdderResult(number[i], (number[i] + 1) * number[i] / 2));
83             // Pick next number to calculate
84             number[i] = numbers[next++%numbers.length];
85             message[i] = 1;
86             log.debug("Seeräuber {} will die Summe für {} berechnen", seeräuber, number[i]);
87           }
88
89           send(
90             key,
91             new Bytes(stringSerializer.serialize(TOPIC, "{\"next\":" + message[i]++ + "}")),
92             Message.Type.ADD,
93             poisonPill(poisonPills, pass, counter),
94             logicError(logicErrors, pass, counter),
95             messageSender);
96         }
97       }
98
99       return counter;
100     }
101
102     boolean poisonPill (boolean poisonPills, int pass, int counter)
103     {
104       return poisonPills && pass > 300 && counter%99 == 0;
105     }
106
107     boolean logicError(boolean logicErrors, int pass, int counter)
108     {
109       return logicErrors && pass > 300 && counter%77 == 0;
110     }
111
112     void send(
113         Bytes key,
114         Bytes value,
115         Message.Type type,
116         boolean poisonPill,
117         boolean logicError,
118         Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
119     {
120       counter++;
121
122       if (logicError)
123       {
124         value = new Bytes(stringSerializer.serialize(TOPIC, "{\"next\":-1}"));
125       }
126       if (poisonPill)
127       {
128         value = new Bytes("BOOM!".getBytes());
129       }
130
131       ProducerRecord<Bytes, Bytes> record = new ProducerRecord<>(TOPIC, key, value);
132       record.headers().add("__TypeId__", type.toString().getBytes());
133       messageSender.accept(record);
134     }
135
136     @Override
137     public void assertBusinessLogic()
138     {
139       for (int i=0; i<PARTITIONS; i++)
140       {
141         StateDocument stateDocument =
142             tests.stateRepository.findById(Integer.toString(i)).get();
143
144         stateDocument
145             .results
146             .entrySet()
147             .stream()
148             .forEach(entry ->
149             {
150               String user = entry.getKey();
151               List<AdderResult> resultsForUser = entry.getValue();
152
153               for (int j=0; j < resultsForUser.size(); j++)
154               {
155                 if (!(j < state.get(user).size()))
156                 {
157                   break;
158                 }
159
160                 assertThat(resultsForUser.get(j))
161                     .as("Unexpected results calculation %d of user %s", j, user)
162                     .isEqualTo(state.get(user).get(j));
163               }
164
165               assertThat(state.get(user))
166                   .as("More results calculated for user %s as expected", user)
167                   .containsAll(resultsForUser);
168             });
169       }
170     }
171   }
172 }