1 package de.juplo.kafka;
3 import org.apache.kafka.clients.producer.ProducerRecord;
4 import org.apache.kafka.common.serialization.StringSerializer;
5 import org.apache.kafka.common.utils.Bytes;
6 import org.springframework.beans.factory.annotation.Autowired;
9 import java.util.function.Consumer;
10 import java.util.stream.Collectors;
11 import java.util.stream.IntStream;
13 import static org.assertj.core.api.Assertions.assertThat;
16 public class ApplicationTests extends GenericApplicationTests<String, String>
22 public ApplicationTests()
24 super(new ApplicationTestRecrodGenerator());
25 ((ApplicationTestRecrodGenerator)recordGenerator).tests = this;
29 static class ApplicationTestRecrodGenerator implements RecordGenerator
31 ApplicationTests tests;
33 final int[] numbers = {1, 7, 3, 2, 33, 6, 11};
34 final String[] dieWilden13 =
37 .mapToObj(i -> "seeräuber-" + i)
38 .toArray(i -> new String[i]);
39 final StringSerializer stringSerializer = new StringSerializer();
40 final Bytes calculateMessage = new Bytes(stringSerializer.serialize(TOPIC, "CALCULATE"));
44 Map<String, List<AdderResult>> state;
50 Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
56 .collect(Collectors.toMap(
57 seeräuber -> seeräuber,
58 seeräuber -> new LinkedList()));
60 for (int i = 0; i < 33; i++)
62 String seeräuber = dieWilden13[i % 13];
63 int number = numbers[i % 7];
65 Bytes key = new Bytes(stringSerializer.serialize(TOPIC, seeräuber));
67 for (int message = 1; message <= number; message++)
69 Bytes value = new Bytes(stringSerializer.serialize(TOPIC, Integer.toString(message)));
70 send(key, value, logicErrors, messageSender);
72 send(key, calculateMessage, logicErrors, messageSender);
74 state.get(seeräuber).add(new AdderResult(number, (number + 1) * number / 2));
84 Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
92 value = new Bytes(stringSerializer.serialize(TOPIC, Integer.toString(-1)));
96 messageSender.accept(new ProducerRecord<>(TOPIC, key, value));
100 public boolean canGeneratePoisonPill()
106 public void assertBusinessLogic()
112 .flatMap(map -> map.entrySet().stream())
115 String user = entry.getKey();
116 List<AdderResult> resultsForUser = entry.getValue();
118 assertThat(state.get(user))
119 .describedAs("Unexpected results for user {}", user)
120 .containsExactlyElementsOf(resultsForUser);