test: Überprüfung der Fachlogik ergänzt
[demos/kafka/training] / src / test / java / de / juplo / kafka / ApplicationTests.java
1 package de.juplo.kafka;
2
3 import org.apache.kafka.clients.producer.ProducerRecord;
4 import org.apache.kafka.common.serialization.StringSerializer;
5 import org.apache.kafka.common.utils.Bytes;
6 import org.springframework.beans.factory.annotation.Autowired;
7
8 import java.util.*;
9 import java.util.function.Consumer;
10 import java.util.stream.Collectors;
11 import java.util.stream.IntStream;
12
13 import static org.assertj.core.api.Assertions.assertThat;
14
15
16 public class ApplicationTests extends GenericApplicationTests<String, String>
17 {
18   @Autowired
19   AdderResults results;
20
21
22   public ApplicationTests()
23   {
24     super(new ApplicationTestRecrodGenerator());
25     ((ApplicationTestRecrodGenerator)recordGenerator).tests = this;
26   }
27
28
29   static class ApplicationTestRecrodGenerator implements RecordGenerator
30   {
31     ApplicationTests tests;
32
33     final int[] numbers = {1, 7, 3, 2, 33, 6, 11};
34     final String[] dieWilden13 =
35         IntStream
36             .range(1, 14)
37             .mapToObj(i -> "seeräuber-" + i)
38             .toArray(i -> new String[i]);
39     final StringSerializer stringSerializer = new StringSerializer();
40     final Bytes calculateMessage = new Bytes(stringSerializer.serialize(TOPIC, "CALCULATE"));
41
42     int counter = 0;
43
44     Map<String, List<AdderResult>> state;
45
46     @Override
47     public int generate(
48         boolean poisonPills,
49         boolean logicErrors,
50         Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
51     {
52       counter = 0;
53       state =
54           Arrays
55               .stream(dieWilden13)
56               .collect(Collectors.toMap(
57                   seeräuber -> seeräuber,
58                   seeräuber -> new LinkedList()));
59
60       for (int i = 0; i < 33; i++)
61       {
62         String seeräuber = dieWilden13[i % 13];
63         int number = numbers[i % 7];
64
65         Bytes key = new Bytes(stringSerializer.serialize(TOPIC, seeräuber));
66
67         for (int message = 1; message <= number; message++)
68         {
69           Bytes value = new Bytes(stringSerializer.serialize(TOPIC, Integer.toString(message)));
70           send(key, value, logicErrors, messageSender);
71         }
72         send(key, calculateMessage, logicErrors, messageSender);
73
74         state.get(seeräuber).add(new AdderResult(number, (number + 1) * number / 2));
75       }
76
77       return counter;
78     }
79
80     void send(
81         Bytes key,
82         Bytes value,
83         boolean logicErrors,
84         Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
85     {
86       counter++;
87
88       if (counter == 77)
89       {
90         if (logicErrors)
91         {
92           value = new Bytes(stringSerializer.serialize(TOPIC, Integer.toString(-1)));
93         }
94       }
95
96       messageSender.accept(new ProducerRecord<>(TOPIC, key, value));
97     }
98
99     @Override
100     public boolean canGeneratePoisonPill()
101     {
102       return false;
103     }
104
105     @Override
106     public void assertBusinessLogic()
107     {
108       tests.results
109           .getState()
110           .values()
111           .stream()
112           .flatMap(map -> map.entrySet().stream())
113           .forEach(entry ->
114           {
115             String user = entry.getKey();
116             List<AdderResult> resultsForUser = entry.getValue();
117
118             assertThat(state.get(user))
119                 .describedAs("Unexpected results for user {}", user)
120                 .containsExactlyElementsOf(resultsForUser);
121           });
122     }
123   }
124 }