a1501769255f2be2a0eb788045ec71812b7517b5
[demos/kafka/training] / src / test / java / de / juplo / kafka / ApplicationTests.java
1 package de.juplo.kafka;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.producer.ProducerRecord;
5 import org.apache.kafka.common.serialization.StringSerializer;
6 import org.apache.kafka.common.utils.Bytes;
7 import org.springframework.beans.factory.annotation.Autowired;
8
9 import java.util.*;
10
11 import java.util.function.Consumer;
12 import java.util.stream.Collectors;
13 import java.util.stream.IntStream;
14
15 import static org.assertj.core.api.Assertions.assertThat;
16
17
18 @Slf4j
19 public class ApplicationTests extends GenericApplicationTests<String, String>
20 {
21   @Autowired
22   StateRepository stateRepository;
23
24
25   public ApplicationTests()
26   {
27     super(new ApplicationTestRecrodGenerator());
28     ((ApplicationTestRecrodGenerator) recordGenerator).tests = this;
29   }
30
31
32   static class ApplicationTestRecrodGenerator implements RecordGenerator
33   {
34     ApplicationTests tests;
35
36     final int[] numbers = {1, 77, 33, 2, 66, 666, 11};
37     final String[] dieWilden13 =
38       IntStream
39         .range(1, 14)
40         .mapToObj(i -> "seeräuber-" + i)
41         .toArray(i -> new String[i]);
42     final StringSerializer stringSerializer = new StringSerializer();
43     final Bytes calculateMessage = new Bytes(stringSerializer.serialize(TOPIC, "CALCULATE"));
44
45     int counter = 0;
46
47     Map<String, List<AdderResult>> state;
48
49     @Override
50     public int generate(
51       boolean poisonPills,
52       boolean logicErrors,
53       Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
54     {
55       counter = 0;
56       state =
57         Arrays
58           .stream(dieWilden13)
59           .collect(Collectors.toMap(
60             seeräuber -> seeräuber,
61             seeräuber -> new LinkedList()));
62
63       int number[] = {1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1};
64       int message[] = {1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1};
65       int next = 0;
66
67       for (int pass = 0; pass < 333; pass++)
68       {
69         for (int i = 0; i < 13; i++)
70         {
71           String seeräuber = dieWilden13[i];
72           Bytes key = new Bytes(stringSerializer.serialize(TOPIC, seeräuber));
73
74           if (message[i] > number[i])
75           {
76             send(key, calculateMessage, fail(logicErrors, pass, counter), messageSender);
77             state.get(seeräuber).add(new AdderResult(number[i], (number[i] + 1) * number[i] / 2));
78             // Pick next number to calculate
79             number[i] = numbers[next++ % numbers.length];
80             message[i] = 1;
81             log.debug("Seeräuber {} will die Summe für {} berechnen", seeräuber, number[i]);
82           }
83
84           Bytes value = new Bytes(stringSerializer.serialize(TOPIC, Integer.toString(message[i]++)));
85           send(key, value, fail(logicErrors, pass, counter), messageSender);
86         }
87       }
88
89       return counter;
90     }
91
92     boolean fail(boolean logicErrors, int pass, int counter)
93     {
94       return logicErrors && pass > 300 && counter % 77 == 0;
95     }
96
97     void send(
98       Bytes key,
99       Bytes value,
100       boolean fail,
101       Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
102     {
103       counter++;
104
105       if (fail)
106       {
107         value = new Bytes(stringSerializer.serialize(TOPIC, Integer.toString(-1)));
108       }
109
110       messageSender.accept(new ProducerRecord<>(TOPIC, key, value));
111     }
112
113     @Override
114     public boolean canGeneratePoisonPill()
115     {
116       return false;
117     }
118
119     @Override
120     public void assertBusinessLogic()
121     {
122       for (int i = 0; i < PARTITIONS; i++)
123       {
124         StateDocument stateDocument =
125           tests.stateRepository.findById(Integer.toString(i)).get();
126
127         stateDocument
128           .results
129           .entrySet()
130           .stream()
131           .forEach(entry ->
132           {
133             String user = entry.getKey();
134             List<AdderResult> resultsForUser = entry.getValue();
135
136             for (int j = 0; j < resultsForUser.size(); j++)
137             {
138               if (!(j < state.get(user).size()))
139               {
140                 break;
141               }
142
143               assertThat(resultsForUser.get(j))
144                 .as("Unexpected results calculation %d of user %s", j, user)
145                 .isEqualTo(state.get(user).get(j));
146             }
147
148             assertThat(state.get(user))
149               .as("More results calculated for user %s as expected", user)
150               .containsAll(resultsForUser);
151           });
152       }
153     }
154   }
155 }