Vorlage vereinfacht: Rebalance-Listener entfernt
[demos/kafka/training] / src / test / java / de / juplo / kafka / ApplicationTests.java
1 package de.juplo.kafka;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.producer.ProducerRecord;
5 import org.apache.kafka.common.serialization.StringSerializer;
6 import org.apache.kafka.common.utils.Bytes;
7 import org.springframework.beans.factory.annotation.Autowired;
8
9 import java.util.*;
10 import java.util.function.Consumer;
11 import java.util.stream.Collectors;
12 import java.util.stream.IntStream;
13
14 import static org.assertj.core.api.Assertions.assertThat;
15
16
17 @Slf4j
18 public class ApplicationTests extends GenericApplicationTests<String, String>
19 {
20   @Autowired
21   StateRepository stateRepository;
22
23
24   public ApplicationTests()
25   {
26     super(new ApplicationTestRecrodGenerator());
27     ((ApplicationTestRecrodGenerator)recordGenerator).tests = this;
28   }
29
30
31   static class ApplicationTestRecrodGenerator implements RecordGenerator
32   {
33     ApplicationTests tests;
34
35     final int[] numbers = {1, 77, 33, 2, 66, 666, 11};
36     final String[] dieWilden13 =
37         IntStream
38             .range(1, 14)
39             .mapToObj(i -> "seeräuber-" + i)
40             .toArray(i -> new String[i]);
41     final StringSerializer stringSerializer = new StringSerializer();
42     final Bytes calculateMessage = new Bytes(stringSerializer.serialize(TOPIC, "CALCULATE"));
43
44     int counter = 0;
45
46     Map<String, List<AdderResult>> state;
47
48     @Override
49     public int generate(
50         boolean poisonPills,
51         boolean logicErrors,
52         Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
53     {
54       counter = 0;
55       state =
56           Arrays
57               .stream(dieWilden13)
58               .collect(Collectors.toMap(
59                   seeräuber -> seeräuber,
60                   seeräuber -> new LinkedList()));
61
62       int number[] = { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 };
63       int message[] = { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 };
64       int next = 0;
65
66       for (int pass = 0; pass < 333; pass++)
67       {
68         for (int i = 0; i<13; i++)
69         {
70           String seeräuber = dieWilden13[i];
71           Bytes key = new Bytes(stringSerializer.serialize(TOPIC, seeräuber));
72
73           if (message[i] > number[i])
74           {
75             send(key, calculateMessage, fail(logicErrors, pass, counter), messageSender);
76             state.get(seeräuber).add(new AdderResult(number[i], (number[i] + 1) * number[i] / 2));
77             // Pick next number to calculate
78             number[i] = numbers[next++%numbers.length];
79             message[i] = 1;
80             log.debug("Seeräuber {} will die Summe für {} berechnen", seeräuber, number[i]);
81           }
82
83           Bytes value = new Bytes(stringSerializer.serialize(TOPIC, Integer.toString(message[i]++)));
84           send(key, value, fail(logicErrors, pass, counter), messageSender);
85         }
86       }
87
88       return counter;
89     }
90
91     boolean fail (boolean logicErrors, int pass, int counter)
92     {
93       return logicErrors && pass > 300 && counter%77 == 0;
94     }
95
96     void send(
97         Bytes key,
98         Bytes value,
99         boolean fail,
100         Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
101     {
102       counter++;
103
104       if (fail)
105       {
106         value = new Bytes(stringSerializer.serialize(TOPIC, Integer.toString(-1)));
107       }
108
109       messageSender.accept(new ProducerRecord<>(TOPIC, key, value));
110     }
111
112     @Override
113     public boolean canGeneratePoisonPill()
114     {
115       return false;
116     }
117
118     @Override
119     public void assertBusinessLogic()
120     {
121       for (int i=0; i<PARTITIONS; i++)
122       {
123         StateDocument stateDocument =
124             tests.stateRepository.findById(Integer.toString(i)).get();
125
126         stateDocument
127             .results
128             .entrySet()
129             .stream()
130             .forEach(entry ->
131             {
132               String user = entry.getKey();
133               List<AdderResult> resultsForUser = entry.getValue();
134
135               for (int j=0; j < resultsForUser.size(); j++)
136               {
137                 if (!(j < state.get(user).size()))
138                 {
139                   break;
140                 }
141
142                 assertThat(resultsForUser.get(j))
143                     .as("Unexpected results calculation %d of user %s", j, user)
144                     .isEqualTo(state.get(user).get(j));
145               }
146
147               assertThat(state.get(user))
148                   .as("More results calculated for user %s as expected", user)
149                   .containsAll(resultsForUser);
150             });
151       }
152     }
153   }
154 }