From: Kai Moritz Date: Fri, 9 Sep 2022 11:43:58 +0000 (+0200) Subject: Merge branch 'deserialization' into sumup-adder--ohne--stored-offsets X-Git-Tag: wip-merge-deserialization--sumup-adder--ohne-stored-offsets X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=595eab489c638b07072f6ec7e3c6f52000295931;p=demos%2Fkafka%2Ftraining Merge branch 'deserialization' into sumup-adder--ohne--stored-offsets * Conflicts: ** src/main/java/de/juplo/kafka/ApplicationConfiguration.java ** src/main/java/de/juplo/kafka/EndlessConsumer.java ** src/test/java/de/juplo/kafka/ApplicationIT.java ** src/test/java/de/juplo/kafka/ApplicationTests.java ** src/test/java/de/juplo/kafka/GenericApplicationTests.java ** src/test/java/de/juplo/kafka/TestRecordHandler.java --- 595eab489c638b07072f6ec7e3c6f52000295931 diff --cc src/main/java/de/juplo/kafka/EndlessConsumer.java index 3ff479c,788a4a7..c3ed7c3 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@@ -2,7 -2,10 +2,9 @@@ package de.juplo.kafka import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; - import org.apache.kafka.clients.consumer.*; + import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; + import org.apache.kafka.clients.consumer.ConsumerRecord; + import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.RecordDeserializationException; import org.apache.kafka.common.errors.WakeupException; diff --cc src/test/java/de/juplo/kafka/ApplicationTests.java index 6a037eb,b7f8308..a150176 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@@ -1,154 -1,83 +1,155 @@@ package de.juplo.kafka; +import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Bytes; -import org.springframework.boot.test.context.TestConfiguration; -import org.springframework.context.annotation.Bean; -import org.springframework.test.context.ContextConfiguration; +import org.springframework.beans.factory.annotation.Autowired; + +import java.util.*; + import java.util.function.Consumer; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.assertj.core.api.Assertions.assertThat; -@ContextConfiguration(classes = ApplicationTests.Configuration.class) -public class ApplicationTests extends GenericApplicationTests +@Slf4j +public class ApplicationTests extends GenericApplicationTests { + @Autowired + StateRepository stateRepository; + + public ApplicationTests() { - super( - new RecordGenerator() + super(new ApplicationTestRecrodGenerator()); - ((ApplicationTestRecrodGenerator)recordGenerator).tests = this; ++ ((ApplicationTestRecrodGenerator) recordGenerator).tests = this; + } + + + static class ApplicationTestRecrodGenerator implements RecordGenerator + { + ApplicationTests tests; + + final int[] numbers = {1, 77, 33, 2, 66, 666, 11}; + final String[] dieWilden13 = - IntStream - .range(1, 14) - .mapToObj(i -> "seeräuber-" + i) - .toArray(i -> new String[i]); ++ IntStream ++ .range(1, 14) ++ .mapToObj(i -> "seeräuber-" + i) ++ .toArray(i -> new String[i]); + final StringSerializer stringSerializer = new StringSerializer(); + final Bytes calculateMessage = new Bytes(stringSerializer.serialize(TOPIC, "CALCULATE")); + + int counter = 0; + + Map> state; + + @Override + public int generate( - boolean poisonPills, - boolean logicErrors, - Consumer> messageSender) ++ boolean poisonPills, ++ boolean logicErrors, ++ Consumer> messageSender) + { + counter = 0; + state = - Arrays - .stream(dieWilden13) - .collect(Collectors.toMap( - seeräuber -> seeräuber, - seeräuber -> new LinkedList())); - - int number[] = { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 }; - int message[] = { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 }; ++ Arrays ++ .stream(dieWilden13) ++ .collect(Collectors.toMap( ++ seeräuber -> seeräuber, ++ seeräuber -> new LinkedList())); ++ ++ int number[] = {1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1}; ++ int message[] = {1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1}; + int next = 0; + + for (int pass = 0; pass < 333; pass++) + { - for (int i = 0; i<13; i++) ++ for (int i = 0; i < 13; i++) { - final StringSerializer stringSerializer = new StringSerializer(); - final LongSerializer longSerializer = new LongSerializer(); + String seeräuber = dieWilden13[i]; + Bytes key = new Bytes(stringSerializer.serialize(TOPIC, seeräuber)); + + if (message[i] > number[i]) + { + send(key, calculateMessage, fail(logicErrors, pass, counter), messageSender); + state.get(seeräuber).add(new AdderResult(number[i], (number[i] + 1) * number[i] / 2)); + // Pick next number to calculate - number[i] = numbers[next++%numbers.length]; ++ number[i] = numbers[next++ % numbers.length]; + message[i] = 1; + log.debug("Seeräuber {} will die Summe für {} berechnen", seeräuber, number[i]); + } + + Bytes value = new Bytes(stringSerializer.serialize(TOPIC, Integer.toString(message[i]++))); + send(key, value, fail(logicErrors, pass, counter), messageSender); + } + } + + return counter; + } - boolean fail (boolean logicErrors, int pass, int counter) ++ boolean fail(boolean logicErrors, int pass, int counter) + { - return logicErrors && pass > 300 && counter%77 == 0; ++ return logicErrors && pass > 300 && counter % 77 == 0; + } + + void send( - Bytes key, - Bytes value, - boolean fail, - Consumer> messageSender) ++ Bytes key, ++ Bytes value, ++ boolean fail, ++ Consumer> messageSender) + { + counter++; + + if (fail) + { + value = new Bytes(stringSerializer.serialize(TOPIC, Integer.toString(-1))); + } + + messageSender.accept(new ProducerRecord<>(TOPIC, key, value)); + } + + @Override + public boolean canGeneratePoisonPill() + { + return false; + } - @Override - public int generate( - boolean poisonPills, - boolean logicErrors, - Consumer> messageSender) + @Override + public void assertBusinessLogic() + { - for (int i=0; i - { - String user = entry.getKey(); - List resultsForUser = entry.getValue(); ++ .results ++ .entrySet() ++ .stream() ++ .forEach(entry -> + { - int i = 0; ++ String user = entry.getKey(); ++ List resultsForUser = entry.getValue(); - for (int j=0; j < resultsForUser.size(); j++) - for (int partition = 0; partition < 10; partition++) ++ for (int j = 0; j < resultsForUser.size(); j++) + { - for (int key = 0; key < 10000; key++) ++ if (!(j < state.get(user).size())) { - if (!(j < state.get(user).size())) - i++; - - Bytes value = new Bytes(longSerializer.serialize(TOPIC, (long)i)); - if (i == 99977) -- { - break; - if (logicErrors) - { - value = new Bytes(longSerializer.serialize(TOPIC, Long.MIN_VALUE)); - } - if (poisonPills) - { - value = new Bytes(stringSerializer.serialize(TOPIC, "BOOM (Poison-Pill)!")); - } -- } -- - assertThat(resultsForUser.get(j)) - .as("Unexpected results calculation %d of user %s", j, user) - .isEqualTo(state.get(user).get(j)); - ProducerRecord record = - new ProducerRecord<>( - TOPIC, - partition, - new Bytes(stringSerializer.serialize(TOPIC,Integer.toString(partition*10+key%2))), - value); - - messageSender.accept(record); ++ break; } - } - - return i; - } - }); - } - assertThat(state.get(user)) - .as("More results calculated for user %s as expected", user) - .containsAll(resultsForUser); - }); ++ assertThat(resultsForUser.get(j)) ++ .as("Unexpected results calculation %d of user %s", j, user) ++ .isEqualTo(state.get(user).get(j)); ++ } + - @TestConfiguration - public static class Configuration - { - @Bean - public RecordHandler applicationRecordHandler() - { - return (record) -> - { - if (record.value() == Long.MIN_VALUE) - throw new RuntimeException("BOOM (Logic-Error)!"); - }; ++ assertThat(state.get(user)) ++ .as("More results calculated for user %s as expected", user) ++ .containsAll(resultsForUser); ++ }); + } } } --} ++} diff --cc src/test/java/de/juplo/kafka/GenericApplicationTests.java index 8124c81,4883f75..e16aea7 --- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java +++ b/src/test/java/de/juplo/kafka/GenericApplicationTests.java @@@ -12,11 -11,9 +12,12 @@@ import org.apache.kafka.common.serializ import org.apache.kafka.common.utils.Bytes; import org.junit.jupiter.api.*; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.autoconfigure.mongo.MongoProperties; +import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo; import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer; import org.springframework.boot.test.context.TestConfiguration; + import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Import; import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.test.context.TestPropertySource; @@@ -58,24 -51,16 +58,21 @@@ abstract class GenericApplicationTests< @Autowired Consumer> consumer; @Autowired - ApplicationProperties properties; - @Autowired - ExecutorService executor; + ApplicationProperties applicationProperties; @Autowired + MongoClient mongoClient; + @Autowired + MongoProperties mongoProperties; + @Autowired + RebalanceListener rebalanceListener; + @Autowired - RecordHandler recordHandler; + TestRecordHandler recordHandler; + @Autowired + EndlessConsumer endlessConsumer; - KafkaProducer testRecordProducer; KafkaConsumer offsetConsumer; - EndlessConsumer endlessConsumer; Map oldOffsets; - Map seenOffsets; - Set> receivedRecords; final RecordGenerator recordGenerator;