1 package de.juplo.kafka;
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.producer.ProducerRecord;
5 import org.apache.kafka.common.serialization.StringSerializer;
6 import org.apache.kafka.common.utils.Bytes;
7 import org.assertj.core.api.Assertions;
8 import org.springframework.beans.factory.annotation.Autowired;
11 import java.util.function.Consumer;
12 import java.util.stream.Collectors;
13 import java.util.stream.IntStream;
15 import static org.assertj.core.api.Assertions.assertThat;
19 public class ApplicationTests extends GenericApplicationTests<String, String>
22 StateRepository stateRepository;
25 public ApplicationTests()
27 super(new ApplicationTestRecrodGenerator());
28 ((ApplicationTestRecrodGenerator)recordGenerator).tests = this;
32 static class ApplicationTestRecrodGenerator implements RecordGenerator
34 ApplicationTests tests;
36 final int[] numbers = {1, 77, 33, 2, 66, 666, 11};
37 final String[] dieWilden13 =
40 .mapToObj(i -> "seeräuber-" + i)
41 .toArray(i -> new String[i]);
42 final StringSerializer stringSerializer = new StringSerializer();
43 final Bytes calculateMessage = new Bytes(stringSerializer.serialize(TOPIC, "CALCULATE"));
47 Map<String, List<AdderResult>> state;
53 Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
59 .collect(Collectors.toMap(
60 seeräuber -> seeräuber,
61 seeräuber -> new LinkedList()));
63 int number[] = { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 };
64 int message[] = { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 };
67 for (int pass = 0; pass < 333; pass++)
69 for (int i = 0; i<13; i++)
71 String seeräuber = dieWilden13[i];
72 Bytes key = new Bytes(stringSerializer.serialize(TOPIC, seeräuber));
74 if (message[i] > number[i])
76 send(key, calculateMessage, fail(logicErrors, pass, counter), messageSender);
77 state.get(seeräuber).add(new AdderResult(number[i], (number[i] + 1) * number[i] / 2));
78 // Pick next number to calculate
79 number[i] = numbers[next++%numbers.length];
81 log.debug("Seeräuber {} will die Summe für {} berechnen", seeräuber, number[i]);
84 Bytes value = new Bytes(stringSerializer.serialize(TOPIC, Integer.toString(message[i]++)));
85 send(key, value, fail(logicErrors, pass, counter), messageSender);
92 boolean fail (boolean logicErrors, int pass, int counter)
94 return logicErrors && pass > 300 && counter%77 == 0;
101 Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
107 value = new Bytes(stringSerializer.serialize(TOPIC, Integer.toString(-1)));
110 messageSender.accept(new ProducerRecord<>(TOPIC, key, value));
114 public boolean canGeneratePoisonPill()
120 public void assertBusinessLogic()
122 for (int i=0; i<PARTITIONS; i++)
124 StateDocument stateDocument =
125 tests.stateRepository.findById(Integer.toString(i)).get();
133 String user = entry.getKey();
134 List<AdderResult> resultsForUser = entry.getValue();
136 for (int j=0; j < resultsForUser.size(); j++)
138 if (!(j < state.get(user).size()))
143 assertThat(resultsForUser.get(j))
144 .as("Unexpected results calculation %i of user %s", j, user)
145 .isEqualTo(state.get(user).get(j));
148 assertThat(state.get(user))
149 .as("More results calculated for user %s as expected", user)
150 .containsAll(resultsForUser);