1 package de.juplo.kafka;
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.producer.ProducerRecord;
5 import org.apache.kafka.common.serialization.StringSerializer;
6 import org.apache.kafka.common.utils.Bytes;
7 import org.springframework.beans.factory.annotation.Autowired;
10 import java.util.function.Consumer;
11 import java.util.stream.Collectors;
12 import java.util.stream.IntStream;
14 import static org.assertj.core.api.Assertions.assertThat;
18 public class ApplicationTests extends GenericApplicationTests<String, Message>
21 StateRepository stateRepository;
24 public ApplicationTests()
26 super(new ApplicationTestRecrodGenerator());
27 ((ApplicationTestRecrodGenerator)recordGenerator).tests = this;
31 static class ApplicationTestRecrodGenerator implements RecordGenerator
33 ApplicationTests tests;
35 final int[] numbers = {1, 77, 33, 2, 66, 666, 11};
36 final String[] dieWilden13 =
39 .mapToObj(i -> "seeräuber-" + i)
40 .toArray(i -> new String[i]);
41 final StringSerializer stringSerializer = new StringSerializer();
42 final Bytes calculateMessage = new Bytes(stringSerializer.serialize(TOPIC, "{}"));
46 Map<String, List<AdderResult>> state;
52 Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
58 .collect(Collectors.toMap(
59 seeräuber -> seeräuber,
60 seeräuber -> new LinkedList()));
62 int number[] = { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 };
63 int message[] = { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 };
66 for (int pass = 0; pass < 333; pass++)
68 for (int i = 0; i<13; i++)
70 String seeräuber = dieWilden13[i];
71 Bytes key = new Bytes(stringSerializer.serialize(TOPIC, seeräuber));
73 if (message[i] > number[i])
79 poisonPill(poisonPills, pass, counter),
80 logicError(logicErrors, pass, counter),
82 state.get(seeräuber).add(new AdderResult(number[i], (number[i] + 1) * number[i] / 2));
83 // Pick next number to calculate
84 number[i] = numbers[next++%numbers.length];
86 log.debug("Seeräuber {} will die Summe für {} berechnen", seeräuber, number[i]);
91 new Bytes(stringSerializer.serialize(TOPIC, "{\"next\":" + message[i]++ + "}")),
93 poisonPill(poisonPills, pass, counter),
94 logicError(logicErrors, pass, counter),
102 boolean poisonPill (boolean poisonPills, int pass, int counter)
104 return poisonPills && pass > 300 && counter%99 == 0;
107 boolean logicError(boolean logicErrors, int pass, int counter)
109 return logicErrors && pass > 300 && counter%77 == 0;
118 Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
124 value = new Bytes(stringSerializer.serialize(TOPIC, "{\"next\":-1}"));
128 value = new Bytes("BOOM!".getBytes());
131 ProducerRecord<Bytes, Bytes> record = new ProducerRecord<>(TOPIC, key, value);
132 record.headers().add("__TypeId__", type.toString().getBytes());
133 messageSender.accept(record);
137 public void assertBusinessLogic()
139 for (int i=0; i<PARTITIONS; i++)
141 StateDocument stateDocument =
142 tests.stateRepository.findById(Integer.toString(i)).get();
150 String user = entry.getKey();
151 List<AdderResult> resultsForUser = entry.getValue();
153 for (int j=0; j < resultsForUser.size(); j++)
155 if (!(j < state.get(user).size()))
160 assertThat(resultsForUser.get(j))
161 .as("Unexpected results calculation %d of user %s", j, user)
162 .isEqualTo(state.get(user).get(j));
165 assertThat(state.get(user))
166 .as("More results calculated for user %s as expected", user)
167 .containsAll(resultsForUser);