1 package de.juplo.kafka;
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.producer.ProducerRecord;
5 import org.apache.kafka.common.serialization.StringSerializer;
6 import org.apache.kafka.common.utils.Bytes;
7 import org.springframework.beans.factory.annotation.Autowired;
10 import java.util.function.Consumer;
11 import java.util.stream.Collectors;
12 import java.util.stream.IntStream;
14 import static org.assertj.core.api.Assertions.assertThat;
18 public class ApplicationTests extends GenericApplicationTests<String, String>
21 StateRepository stateRepository;
24 public ApplicationTests()
26 super(new ApplicationTestRecrodGenerator());
27 ((ApplicationTestRecrodGenerator)recordGenerator).tests = this;
31 static class ApplicationTestRecrodGenerator implements RecordGenerator
33 ApplicationTests tests;
35 final int[] numbers = {1, 77, 33, 2, 66, 666, 11};
36 final String[] dieWilden13 =
39 .mapToObj(i -> "seeräuber-" + i)
40 .toArray(i -> new String[i]);
41 final StringSerializer stringSerializer = new StringSerializer();
42 final Bytes calculateMessage = new Bytes(stringSerializer.serialize(TOPIC, "CALCULATE"));
46 Map<String, List<AdderResult>> state;
52 Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
58 .collect(Collectors.toMap(
59 seeräuber -> seeräuber,
60 seeräuber -> new LinkedList()));
62 int number[] = { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 };
63 int message[] = { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 };
66 for (int pass = 0; pass < 333; pass++)
68 for (int i = 0; i<13; i++)
70 String seeräuber = dieWilden13[i];
71 Bytes key = new Bytes(stringSerializer.serialize(TOPIC, seeräuber));
73 if (message[i] > number[i])
75 send(key, calculateMessage, fail(logicErrors, pass, counter), messageSender);
76 state.get(seeräuber).add(new AdderResult(number[i], (number[i] + 1) * number[i] / 2));
77 // Pick next number to calculate
78 number[i] = numbers[next++%numbers.length];
80 log.debug("Seeräuber {} will die Summe für {} berechnen", seeräuber, number[i]);
83 Bytes value = new Bytes(stringSerializer.serialize(TOPIC, Integer.toString(message[i]++)));
84 send(key, value, fail(logicErrors, pass, counter), messageSender);
91 boolean fail (boolean logicErrors, int pass, int counter)
93 return logicErrors && pass > 300 && counter%77 == 0;
100 Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
106 value = new Bytes(stringSerializer.serialize(TOPIC, Integer.toString(-1)));
109 messageSender.accept(new ProducerRecord<>(TOPIC, key, value));
113 public boolean canGeneratePoisonPill()
119 public void assertBusinessLogic()
121 for (int i=0; i<PARTITIONS; i++)
123 StateDocument stateDocument =
124 tests.stateRepository.findById(Integer.toString(i)).get();
127 .results.entrySet().stream()
130 String user = entry.getKey();
131 List<AdderResult> resultsForUser = entry.getValue();
133 assertThat(state.get(user))
134 .as("Unexpected results for user %s", user)
135 .containsExactlyElementsOf(resultsForUser);