From: Kai Moritz Date: Fri, 9 Sep 2022 11:43:58 +0000 (+0200) Subject: Merge branch 'deserialization' into sumup-adder--ohne--stored-offsets X-Git-Tag: wip-merge-deserialization--sumup-adder--ohne-stored-offsets X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=refs%2Ftags%2Fwip-merge-deserialization--sumup-adder--ohne-stored-offsets;hp=627763878b235ba168c7f55a1ef448851b027bfc;p=demos%2Fkafka%2Ftraining Merge branch 'deserialization' into sumup-adder--ohne--stored-offsets * Conflicts: ** src/main/java/de/juplo/kafka/ApplicationConfiguration.java ** src/main/java/de/juplo/kafka/EndlessConsumer.java ** src/test/java/de/juplo/kafka/ApplicationIT.java ** src/test/java/de/juplo/kafka/ApplicationTests.java ** src/test/java/de/juplo/kafka/GenericApplicationTests.java ** src/test/java/de/juplo/kafka/TestRecordHandler.java --- diff --git a/pom.xml b/pom.xml index ecb559a..6699408 100644 --- a/pom.xml +++ b/pom.xml @@ -116,6 +116,9 @@ + + maven-failsafe-plugin + diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index 3ff479c..c3ed7c3 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -2,7 +2,9 @@ package de.juplo.kafka; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.*; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.RecordDeserializationException; import org.apache.kafka.common.errors.WakeupException; diff --git a/src/test/java/de/juplo/kafka/ApplicationIT.java b/src/test/java/de/juplo/kafka/ApplicationIT.java index cded0ee..3711a83 100644 --- a/src/test/java/de/juplo/kafka/ApplicationIT.java +++ b/src/test/java/de/juplo/kafka/ApplicationIT.java @@ -8,7 +8,7 @@ import org.springframework.boot.test.web.client.TestRestTemplate; import org.springframework.boot.test.web.server.LocalServerPort; import org.springframework.kafka.test.context.EmbeddedKafka; -import static de.juplo.kafka.ApplicationTests.TOPIC; +import static de.juplo.kafka.ApplicationIT.TOPIC; @SpringBootTest( diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 6a037eb..a150176 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -7,6 +7,7 @@ import org.apache.kafka.common.utils.Bytes; import org.springframework.beans.factory.annotation.Autowired; import java.util.*; + import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -24,7 +25,7 @@ public class ApplicationTests extends GenericApplicationTests public ApplicationTests() { super(new ApplicationTestRecrodGenerator()); - ((ApplicationTestRecrodGenerator)recordGenerator).tests = this; + ((ApplicationTestRecrodGenerator) recordGenerator).tests = this; } @@ -34,10 +35,10 @@ public class ApplicationTests extends GenericApplicationTests final int[] numbers = {1, 77, 33, 2, 66, 666, 11}; final String[] dieWilden13 = - IntStream - .range(1, 14) - .mapToObj(i -> "seeräuber-" + i) - .toArray(i -> new String[i]); + IntStream + .range(1, 14) + .mapToObj(i -> "seeräuber-" + i) + .toArray(i -> new String[i]); final StringSerializer stringSerializer = new StringSerializer(); final Bytes calculateMessage = new Bytes(stringSerializer.serialize(TOPIC, "CALCULATE")); @@ -47,25 +48,25 @@ public class ApplicationTests extends GenericApplicationTests @Override public int generate( - boolean poisonPills, - boolean logicErrors, - Consumer> messageSender) + boolean poisonPills, + boolean logicErrors, + Consumer> messageSender) { counter = 0; state = - Arrays - .stream(dieWilden13) - .collect(Collectors.toMap( - seeräuber -> seeräuber, - seeräuber -> new LinkedList())); - - int number[] = { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 }; - int message[] = { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 }; + Arrays + .stream(dieWilden13) + .collect(Collectors.toMap( + seeräuber -> seeräuber, + seeräuber -> new LinkedList())); + + int number[] = {1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1}; + int message[] = {1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1}; int next = 0; for (int pass = 0; pass < 333; pass++) { - for (int i = 0; i<13; i++) + for (int i = 0; i < 13; i++) { String seeräuber = dieWilden13[i]; Bytes key = new Bytes(stringSerializer.serialize(TOPIC, seeräuber)); @@ -75,7 +76,7 @@ public class ApplicationTests extends GenericApplicationTests send(key, calculateMessage, fail(logicErrors, pass, counter), messageSender); state.get(seeräuber).add(new AdderResult(number[i], (number[i] + 1) * number[i] / 2)); // Pick next number to calculate - number[i] = numbers[next++%numbers.length]; + number[i] = numbers[next++ % numbers.length]; message[i] = 1; log.debug("Seeräuber {} will die Summe für {} berechnen", seeräuber, number[i]); } @@ -88,16 +89,16 @@ public class ApplicationTests extends GenericApplicationTests return counter; } - boolean fail (boolean logicErrors, int pass, int counter) + boolean fail(boolean logicErrors, int pass, int counter) { - return logicErrors && pass > 300 && counter%77 == 0; + return logicErrors && pass > 300 && counter % 77 == 0; } void send( - Bytes key, - Bytes value, - boolean fail, - Consumer> messageSender) + Bytes key, + Bytes value, + boolean fail, + Consumer> messageSender) { counter++; @@ -118,37 +119,37 @@ public class ApplicationTests extends GenericApplicationTests @Override public void assertBusinessLogic() { - for (int i=0; i - { - String user = entry.getKey(); - List resultsForUser = entry.getValue(); + .results + .entrySet() + .stream() + .forEach(entry -> + { + String user = entry.getKey(); + List resultsForUser = entry.getValue(); - for (int j=0; j < resultsForUser.size(); j++) + for (int j = 0; j < resultsForUser.size(); j++) + { + if (!(j < state.get(user).size())) { - if (!(j < state.get(user).size())) - { - break; - } - - assertThat(resultsForUser.get(j)) - .as("Unexpected results calculation %d of user %s", j, user) - .isEqualTo(state.get(user).get(j)); + break; } - assertThat(state.get(user)) - .as("More results calculated for user %s as expected", user) - .containsAll(resultsForUser); - }); + assertThat(resultsForUser.get(j)) + .as("Unexpected results calculation %d of user %s", j, user) + .isEqualTo(state.get(user).get(j)); + } + + assertThat(state.get(user)) + .as("More results calculated for user %s as expected", user) + .containsAll(resultsForUser); + }); } } } -} +} \ No newline at end of file diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java index 8124c81..e16aea7 100644 --- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java +++ b/src/test/java/de/juplo/kafka/GenericApplicationTests.java @@ -17,6 +17,7 @@ import org.springframework.boot.autoconfigure.mongo.MongoProperties; import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo; import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer; import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Import; import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.test.context.TestPropertySource; @@ -24,7 +25,6 @@ import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; import java.time.Duration; import java.util.*; -import java.util.concurrent.ExecutorService; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -54,13 +54,11 @@ abstract class GenericApplicationTests @Autowired - KafkaConsumer kafkaConsumer; + org.apache.kafka.clients.consumer.Consumer kafkaConsumer; @Autowired Consumer> consumer; @Autowired - ApplicationProperties properties; - @Autowired - ExecutorService executor; + ApplicationProperties applicationProperties; @Autowired MongoClient mongoClient; @Autowired @@ -68,14 +66,13 @@ abstract class GenericApplicationTests @Autowired RebalanceListener rebalanceListener; @Autowired - RecordHandler recordHandler; + TestRecordHandler recordHandler; + @Autowired + EndlessConsumer endlessConsumer; KafkaProducer testRecordProducer; KafkaConsumer offsetConsumer; - EndlessConsumer endlessConsumer; Map oldOffsets; - Map seenOffsets; - Set> receivedRecords; final RecordGenerator recordGenerator; @@ -99,7 +96,7 @@ abstract class GenericApplicationTests await(numberOfGeneratedMessages + " records received") .atMost(Duration.ofSeconds(30)) .pollInterval(Duration.ofSeconds(1)) - .until(() -> receivedRecords.size() >= numberOfGeneratedMessages); + .until(() -> recordHandler.receivedRecords.size() >= numberOfGeneratedMessages); await("Offsets committed") .atMost(Duration.ofSeconds(10)) @@ -107,7 +104,7 @@ abstract class GenericApplicationTests .untilAsserted(() -> { checkSeenOffsetsForProgress(); - assertSeenOffsetsEqualCommittedOffsets(seenOffsets); + assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); }); assertThatExceptionOfType(IllegalStateException.class) @@ -131,7 +128,7 @@ abstract class GenericApplicationTests .until(() -> !endlessConsumer.running()); checkSeenOffsetsForProgress(); - assertSeenOffsetsEqualCommittedOffsets(seenOffsets); + assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); endlessConsumer.start(); await("Consumer failed") @@ -140,8 +137,8 @@ abstract class GenericApplicationTests .until(() -> !endlessConsumer.running()); checkSeenOffsetsForProgress(); - assertSeenOffsetsEqualCommittedOffsets(seenOffsets); - assertThat(receivedRecords.size()) + assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); + assertThat(recordHandler.receivedRecords.size()) .describedAs("Received not all sent events") .isLessThan(numberOfGeneratedMessages); @@ -168,7 +165,7 @@ abstract class GenericApplicationTests .until(() -> !endlessConsumer.running()); checkSeenOffsetsForProgress(); - assertSeenOffsetsAreBehindCommittedOffsets(seenOffsets); + assertSeenOffsetsAreBehindCommittedOffsets(recordHandler.seenOffsets); endlessConsumer.start(); await("Consumer failed") @@ -176,7 +173,7 @@ abstract class GenericApplicationTests .pollInterval(Duration.ofSeconds(1)) .until(() -> !endlessConsumer.running()); - assertSeenOffsetsAreBehindCommittedOffsets(seenOffsets); + assertSeenOffsetsAreBehindCommittedOffsets(recordHandler.seenOffsets); assertThatNoException() .describedAs("Consumer should not be running") @@ -229,7 +226,7 @@ abstract class GenericApplicationTests partitions().forEach(tp -> { Long oldOffset = oldOffsets.get(tp) + 1; - Long newOffset = seenOffsets.get(tp) + 1; + Long newOffset = recordHandler.seenOffsets.get(tp) + 1; if (!oldOffset.equals(newOffset)) { log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset); @@ -329,16 +326,16 @@ abstract class GenericApplicationTests { Properties props; props = new Properties(); - props.put("bootstrap.servers", properties.getBootstrapServer()); + props.put("bootstrap.servers", applicationProperties.getBootstrapServer()); props.put("linger.ms", 100); props.put("key.serializer", BytesSerializer.class.getName()); props.put("value.serializer", BytesSerializer.class.getName()); testRecordProducer = new KafkaProducer<>(props); props = new Properties(); - props.put("bootstrap.servers", properties.getBootstrapServer()); + props.put("bootstrap.servers", applicationProperties.getBootstrapServer()); props.put("client.id", "OFFSET-CONSUMER"); - props.put("group.id", properties.getGroupId()); + props.put("group.id", applicationProperties.getGroupId()); props.put("key.deserializer", BytesDeserializer.class.getName()); props.put("value.deserializer", BytesDeserializer.class.getName()); offsetConsumer = new KafkaConsumer<>(props); @@ -347,43 +344,30 @@ abstract class GenericApplicationTests seekToEnd(); oldOffsets = new HashMap<>(); - seenOffsets = new HashMap<>(); - receivedRecords = new HashSet<>(); + recordHandler.seenOffsets = new HashMap<>(); + recordHandler.receivedRecords = new HashSet<>(); doForCurrentOffsets((tp, offset) -> { oldOffsets.put(tp, offset - 1); - seenOffsets.put(tp, offset - 1); + recordHandler.seenOffsets.put(tp, offset - 1); }); - TestRecordHandler captureOffsetAndExecuteTestHandler = - new TestRecordHandler(recordHandler) - { - @Override - public void onNewRecord(ConsumerRecord record) - { - seenOffsets.put( - new TopicPartition(record.topic(), record.partition()), - record.offset()); - receivedRecords.add(record); - } - }; - - endlessConsumer = - new EndlessConsumer<>( - executor, - properties.getClientId(), - properties.getTopic(), - kafkaConsumer, - rebalanceListener, - captureOffsetAndExecuteTestHandler); - endlessConsumer.start(); } @AfterEach public void deinit() { + try + { + endlessConsumer.stop(); + } + catch (Exception e) + { + log.debug("{}", e.toString()); + } + try { testRecordProducer.close(); @@ -400,5 +384,10 @@ abstract class GenericApplicationTests @Import(ApplicationConfiguration.class) public static class Configuration { + @Bean + public RecordHandler recordHandler(RecordHandler applicationRecordHandler) + { + return new TestRecordHandler(applicationRecordHandler); + } } } diff --git a/src/test/java/de/juplo/kafka/TestRecordHandler.java b/src/test/java/de/juplo/kafka/TestRecordHandler.java index b4efdd6..37d3f65 100644 --- a/src/test/java/de/juplo/kafka/TestRecordHandler.java +++ b/src/test/java/de/juplo/kafka/TestRecordHandler.java @@ -2,16 +2,28 @@ package de.juplo.kafka; import lombok.RequiredArgsConstructor; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.TopicPartition; + +import java.util.Map; +import java.util.Set; @RequiredArgsConstructor -public abstract class TestRecordHandler implements RecordHandler +public class TestRecordHandler implements RecordHandler { private final RecordHandler handler; + Map seenOffsets; + Set> receivedRecords; - public abstract void onNewRecord(ConsumerRecord record); + public void onNewRecord(ConsumerRecord record) + { + seenOffsets.put( + new TopicPartition(record.topic(), record.partition()), + record.offset()); + receivedRecords.add(record); + } @Override public void accept(ConsumerRecord record)