1 package de.juplo.kafka;
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.producer.ProducerRecord;
5 import org.apache.kafka.common.serialization.StringSerializer;
6 import org.apache.kafka.common.utils.Bytes;
7 import org.springframework.beans.factory.annotation.Autowired;
10 import java.util.function.Consumer;
11 import java.util.stream.Collectors;
12 import java.util.stream.IntStream;
14 import static org.assertj.core.api.Assertions.assertThat;
18 public class ApplicationTests extends GenericApplicationTests<String, Message>
21 StateRepository stateRepository;
24 public ApplicationTests()
26 super(new ApplicationTestRecrodGenerator());
27 ((ApplicationTestRecrodGenerator)recordGenerator).tests = this;
31 static class ApplicationTestRecrodGenerator implements RecordGenerator
33 ApplicationTests tests;
35 final int[] numbers = {1, 77, 33, 2, 66, 666, 11};
36 final String[] dieWilden13 =
39 .mapToObj(i -> "seeräuber-" + i)
40 .toArray(i -> new String[i]);
41 final StringSerializer stringSerializer = new StringSerializer();
42 final Bytes calculateMessage = new Bytes(stringSerializer.serialize(TOPIC, "{}"));
45 int counterPoisonPills;
46 int counterLogicErrors;
48 Map<String, List<AdderResult>> state;
54 Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
57 counterPoisonPills = 0;
58 counterLogicErrors = 0;
63 .collect(Collectors.toMap(
64 seeräuber -> seeräuber,
65 seeräuber -> new LinkedList()));
67 int number[] = { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 };
68 int message[] = { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 };
71 for (int pass = 0; pass < 333; pass++)
73 for (int i = 0; i<13; i++)
75 String seeräuber = dieWilden13[i];
76 Bytes key = new Bytes(stringSerializer.serialize(TOPIC, seeräuber));
78 if (message[i] > number[i])
84 poisonPill(poisonPills, pass, counterMessages),
85 logicError(logicErrors, pass, counterMessages),
87 state.get(seeräuber).add(new AdderResult(number[i], (number[i] + 1) * number[i] / 2));
88 // Pick next number to calculate
89 number[i] = numbers[next++%numbers.length];
91 log.debug("Seeräuber {} will die Summe für {} berechnen", seeräuber, number[i]);
96 new Bytes(stringSerializer.serialize(TOPIC, "{\"next\":" + message[i]++ + "}")),
98 poisonPill(poisonPills, pass, counterMessages),
99 logicError(logicErrors, pass, counterMessages),
106 public int getNumberOfMessages()
108 return counterMessages;
112 public int getNumberOfPoisonPills()
114 return counterPoisonPills;
118 public int getNumberOfLogicErrors()
120 return counterLogicErrors;
123 boolean poisonPill (boolean poisonPills, int pass, int counter)
125 return poisonPills && pass > 300 && counter%99 == 0;
128 boolean logicError(boolean logicErrors, int pass, int counter)
130 return logicErrors && pass > 300 && counter%77 == 0;
139 Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
145 value = new Bytes(stringSerializer.serialize(TOPIC, "{\"next\":-1}"));
146 counterLogicErrors++;
150 value = new Bytes("BOOM!".getBytes());
151 counterPoisonPills++;
154 ProducerRecord<Bytes, Bytes> record = new ProducerRecord<>(TOPIC, key, value);
155 record.headers().add("__TypeId__", type.toString().getBytes());
156 messageSender.accept(record);
160 public void assertBusinessLogic()
162 for (int i=0; i<PARTITIONS; i++)
164 StateDocument stateDocument =
165 tests.stateRepository.findById(Integer.toString(i)).get();
173 String user = entry.getKey();
174 List<AdderResult> resultsForUser = entry.getValue();
176 for (int j=0; j < resultsForUser.size(); j++)
178 if (!(j < state.get(user).size()))
183 assertThat(resultsForUser.get(j))
184 .as("Unexpected results calculation %d of user %s", j, user)
185 .isEqualTo(state.get(user).get(j));
188 assertThat(state.get(user))
189 .as("More results calculated for user %s as expected", user)
190 .containsAll(resultsForUser);