From: Kai Moritz Date: Tue, 16 Aug 2022 16:58:10 +0000 (+0200) Subject: Verbesserungen und Fachlogik-Test aus 'sumup-adder' gemerged X-Git-Tag: sumup-adder---lvm-2-tage~10^2~2 X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=890fd85c334a078610701bd6e571d133df69473f;hp=e5c029af3690ae8ebed729af4f06296cef13fa3c;p=demos%2Fkafka%2Ftraining Verbesserungen und Fachlogik-Test aus 'sumup-adder' gemerged --- diff --git a/src/main/java/de/juplo/kafka/AdderBusinessLogic.java b/src/main/java/de/juplo/kafka/AdderBusinessLogic.java index 1f3d9aa..d525182 100644 --- a/src/main/java/de/juplo/kafka/AdderBusinessLogic.java +++ b/src/main/java/de/juplo/kafka/AdderBusinessLogic.java @@ -8,7 +8,7 @@ import java.util.Optional; public class AdderBusinessLogic { - private final Map state; + private final Map state; public AdderBusinessLogic() @@ -16,37 +16,31 @@ public class AdderBusinessLogic this(new HashMap<>()); } - public AdderBusinessLogic(Map state) + public AdderBusinessLogic(Map state) { this.state = state; } - public synchronized void startSum(String user) - { - if (state.containsKey(user)) - throw new IllegalStateException("Sumation for " + user + " already in progress, state: " + state.get(user)); - - state.put(user, 0l); - } - public synchronized Optional getSum(String user) { - return Optional.ofNullable(state.get(user)); + return Optional.ofNullable(state.get(user)).map(result -> result.sum); } public synchronized void addToSum(String user, Integer value) { - if (!state.containsKey(user)) - throw new IllegalStateException("No sumation for " + user + " in progress"); if (value == null || value < 1) throw new IllegalArgumentException("Not a positive number: " + value); - long result = state.get(user) + value; - state.put(user, result); + long sum = + Optional + .ofNullable(state.get(user)) + .map(result -> result.sum) + .orElse(0l); + state.put(user, new AdderResult(value, sum + value)); } - public synchronized Long endSum(String user) + public synchronized AdderResult calculate(String user) { if (!state.containsKey(user)) throw new IllegalStateException("No sumation for " + user + " in progress"); @@ -54,7 +48,7 @@ public class AdderBusinessLogic return state.remove(user); } - protected Map getState() + protected Map getState() { return state; } diff --git a/src/main/java/de/juplo/kafka/AdderResult.java b/src/main/java/de/juplo/kafka/AdderResult.java new file mode 100644 index 0000000..44b7da8 --- /dev/null +++ b/src/main/java/de/juplo/kafka/AdderResult.java @@ -0,0 +1,21 @@ +package de.juplo.kafka; + +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.RequiredArgsConstructor; + + +@RequiredArgsConstructor +@Getter +@EqualsAndHashCode +public class AdderResult +{ + final int number; + final long sum; + + @Override + public String toString() + { + return "sum(" + number + ") = " + sum; + } +} diff --git a/src/main/java/de/juplo/kafka/AdderResults.java b/src/main/java/de/juplo/kafka/AdderResults.java new file mode 100644 index 0000000..e7f5602 --- /dev/null +++ b/src/main/java/de/juplo/kafka/AdderResults.java @@ -0,0 +1,47 @@ +package de.juplo.kafka; + +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + + +public class AdderResults +{ + private final Map>> results = new HashMap<>(); + + + public void addResults(Integer partition, String user, AdderResult result) + { + Map> resultsByUser = this.results.get(partition); + + List results = resultsByUser.get(user); + if (results == null) + { + results = new LinkedList<>(); + resultsByUser.put(user, results); + } + + results.add(result); + } + + protected void addPartition(Integer partition, Map> results) + { + this.results.put(partition, results); + } + + protected Map> removePartition(Integer partition) + { + return this.results.remove(partition); + } + + public Map>> getState() + { + return results; + } + + public Map> getState(Integer partition) + { + return results.get(partition); + } +} diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 5e1f8fb..b58295f 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -17,19 +17,27 @@ import java.util.concurrent.Executors; public class ApplicationConfiguration { @Bean - public ApplicationRecordHandler recordHandler() + public ApplicationRecordHandler recordHandler(AdderResults adderResults) { - return new ApplicationRecordHandler(); + return new ApplicationRecordHandler(adderResults); + } + + @Bean + public AdderResults adderResults() + { + return new AdderResults(); } @Bean public ApplicationRebalanceListener rebalanceListener( ApplicationRecordHandler recordHandler, + AdderResults adderResults, StateRepository stateRepository, ApplicationProperties properties) { return new ApplicationRebalanceListener( recordHandler, + adderResults, stateRepository, properties.getClientId(), Clock.systemDefaultZone(), diff --git a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java index 7256732..32e14e8 100644 --- a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java +++ b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java @@ -7,8 +7,7 @@ import org.apache.kafka.common.TopicPartition; import java.time.Clock; import java.time.Duration; import java.time.Instant; -import java.util.Collection; -import java.util.Map; +import java.util.*; @RequiredArgsConstructor @@ -16,11 +15,14 @@ import java.util.Map; public class ApplicationRebalanceListener implements PollIntervalAwareConsumerRebalanceListener { private final ApplicationRecordHandler recordHandler; + private final AdderResults adderResults; private final StateRepository stateRepository; private final String id; private final Clock clock; private final Duration commitInterval; + private final Set partitions = new HashSet<>(); + private Instant lastCommit = Instant.EPOCH; @Override @@ -30,11 +32,13 @@ public class ApplicationRebalanceListener implements PollIntervalAwareConsumerRe { Integer partition = tp.partition(); log.info("{} - adding partition: {}", id, partition); + this.partitions.add(partition); StateDocument document = stateRepository .findById(Integer.toString(partition)) .orElse(new StateDocument(partition)); recordHandler.addPartition(partition, document.state); + adderResults.addPartition(partition, document.results); }); } @@ -45,17 +49,19 @@ public class ApplicationRebalanceListener implements PollIntervalAwareConsumerRe { Integer partition = tp.partition(); log.info("{} - removing partition: {}", id, partition); - Map removed = recordHandler.removePartition(partition); - for (String key : removed.keySet()) + this.partitions.remove(partition); + Map state = recordHandler.removePartition(partition); + for (String key : state.keySet()) { log.info( "{} - Seen {} messages for partition={}|key={}", id, - removed.get(key), + state.get(key), partition, key); } - stateRepository.save(new StateDocument(partition, removed)); + Map> results = adderResults.removePartition(partition); + stateRepository.save(new StateDocument(partition, state, results)); }); } @@ -66,10 +72,11 @@ public class ApplicationRebalanceListener implements PollIntervalAwareConsumerRe if (lastCommit.plus(commitInterval).isBefore(clock.instant())) { log.debug("Storing data, last commit: {}", lastCommit); - recordHandler.getState().forEach((partiton, adder) -> stateRepository.save( + partitions.forEach(partition -> stateRepository.save( new StateDocument( - partiton, - adder.getState()))); + partition, + recordHandler.getState(partition).getState(), + adderResults.getState(partition)))); lastCommit = clock.instant(); } } diff --git a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java index d0d385c..596f3da 100644 --- a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java +++ b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java @@ -1,5 +1,6 @@ package de.juplo.kafka; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -7,9 +8,12 @@ import java.util.HashMap; import java.util.Map; +@RequiredArgsConstructor @Slf4j public class ApplicationRecordHandler implements RecordHandler { + private final AdderResults results; + private final Map state = new HashMap<>(); @@ -19,29 +23,24 @@ public class ApplicationRecordHandler implements RecordHandler Integer partition = record.partition(); String user = record.key(); String message = record.value(); - switch (message) + + if (message.equals("CALCULATE")) { - case "START": - state.get(partition).startSum(user); - break; - - case "END": - Long result = state.get(partition).endSum(user); - log.info("New result for {}: {}", user, result); - break; - - default: - state.get(partition).addToSum(user, Integer.parseInt(message)); - break; + AdderResult result = state.get(partition).calculate(user); + log.info("New result for {}: {}", user, result); + results.addResults(partition, user, result); + return; } + + state.get(partition).addToSum(user, Integer.parseInt(message)); } - protected void addPartition(Integer partition, Map state) + protected void addPartition(Integer partition, Map state) { this.state.put(partition, new AdderBusinessLogic(state)); } - protected Map removePartition(Integer partition) + protected Map removePartition(Integer partition) { return this.state.remove(partition).getState(); } @@ -51,4 +50,9 @@ public class ApplicationRecordHandler implements RecordHandler { return state; } + + public AdderBusinessLogic getState(Integer partition) + { + return state.get(partition); + } } diff --git a/src/main/java/de/juplo/kafka/DriverController.java b/src/main/java/de/juplo/kafka/DriverController.java index d389271..26a5bc8 100644 --- a/src/main/java/de/juplo/kafka/DriverController.java +++ b/src/main/java/de/juplo/kafka/DriverController.java @@ -5,6 +5,7 @@ import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.*; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.ExecutionException; @@ -17,6 +18,7 @@ public class DriverController { private final EndlessConsumer consumer; private final ApplicationRecordHandler recordHandler; + private final AdderResults results; @PostMapping("start") @@ -33,7 +35,7 @@ public class DriverController @GetMapping("state") - public Map> state() + public Map> state() { return recordHandler @@ -46,7 +48,7 @@ public class DriverController } @GetMapping("state/{user}") - public ResponseEntity seen(@PathVariable String user) + public ResponseEntity state(@PathVariable String user) { for (AdderBusinessLogic adder : recordHandler.getState().values()) { @@ -58,6 +60,25 @@ public class DriverController return ResponseEntity.notFound().build(); } + @GetMapping("results") + public Map>> results() + { + return results.getState(); + } + + @GetMapping("results/{user}") + public ResponseEntity> results(@PathVariable String user) + { + for (Map> resultsByUser : this.results.getState().values()) + { + List results = resultsByUser.get(user); + if (results != null) + return ResponseEntity.ok(results); + } + + return ResponseEntity.notFound().build(); + } + @ExceptionHandler @ResponseStatus(HttpStatus.BAD_REQUEST) diff --git a/src/main/java/de/juplo/kafka/StateDocument.java b/src/main/java/de/juplo/kafka/StateDocument.java index c85cc38..ae8eb51 100644 --- a/src/main/java/de/juplo/kafka/StateDocument.java +++ b/src/main/java/de/juplo/kafka/StateDocument.java @@ -5,6 +5,7 @@ import org.springframework.data.annotation.Id; import org.springframework.data.mongodb.core.mapping.Document; import java.util.HashMap; +import java.util.List; import java.util.Map; @@ -14,7 +15,8 @@ public class StateDocument { @Id public String id; - public Map state; + public Map state; + public Map> results; public StateDocument() { @@ -24,13 +26,16 @@ public class StateDocument { this.id = Integer.toString(partition); this.state = new HashMap<>(); + this.results = new HashMap<>(); } public StateDocument( Integer partition, - Map state) + Map state, + Map> results) { this.id = Integer.toString(partition); this.state = state; + this.results = results; } } diff --git a/src/test/java/de/juplo/kafka/AdderBusinessLogicTest.java b/src/test/java/de/juplo/kafka/AdderBusinessLogicTest.java index 435f036..8e49263 100644 --- a/src/test/java/de/juplo/kafka/AdderBusinessLogicTest.java +++ b/src/test/java/de/juplo/kafka/AdderBusinessLogicTest.java @@ -16,23 +16,6 @@ import static org.assertj.core.api.Assertions.*; public class AdderBusinessLogicTest { - @Test - @DisplayName("A new sum can be started, if it does not exist") - public void testStartSumPossibleIfNotExists() - { - AdderBusinessLogic adder = new AdderBusinessLogic(); - assertThatNoException().isThrownBy(() -> adder.startSum("foo")); - } - - @Test - @DisplayName("Starting an already existing sum again, causes an IllegalStateException") - public void testStartSumCausesExceptionIfExists() - { - AdderBusinessLogic adder = new AdderBusinessLogic(); - adder.startSum("foo"); - assertThatIllegalStateException().isThrownBy(() -> adder.startSum("foo")); - } - @Test @DisplayName("An empty Optional should be returned, for a non-existing sum") public void testGetSumReturnsEmptyOptionalForNonExistingSum() @@ -46,101 +29,83 @@ public class AdderBusinessLogicTest public void testGetSumReturnsNonEmptyOptionalForExistingSum() { AdderBusinessLogic adder = new AdderBusinessLogic(); - adder.startSum("foo"); + adder.addToSum("foo", 6); assertThat(adder.getSum("foo")).isNotEmpty(); } @Test - @DisplayName("A sum can be ended, if it does exist") - public void testEndSumPossibleIfSumExists() + @DisplayName("A sum can be calculated, if it does exist") + public void testCalculatePossibleIfSumExists() { AdderBusinessLogic adder = new AdderBusinessLogic(); - adder.startSum("foo"); - assertThatNoException().isThrownBy(() -> adder.endSum("foo")); + adder.addToSum("foo", 6); + assertThatNoException().isThrownBy(() -> adder.calculate("foo")); } @Test @DisplayName("An existing sum is removed, if ended") - public void testEndSumRemovesSumIfSumExists() + public void testCalculateRemovesSumIfSumExists() { AdderBusinessLogic adder = new AdderBusinessLogic(); - adder.startSum("foo"); - adder.endSum("foo"); + adder.addToSum("foo", 6); + adder.calculate("foo"); assertThat(adder.getSum("foo")).isEmpty(); } @Test - @DisplayName("An existing Sum returns a non-null value, if ended") - public void testEndSumReturnsNonNullValueIfSumExists() - { - AdderBusinessLogic adder = new AdderBusinessLogic(); - adder.startSum("foo"); - assertThat(adder.endSum("foo")).isNotNull(); - } - - @Test - @DisplayName("An existing Sum returns a non-negative value, if ended") - public void testEndSumReturnsNonNegativeValueIfSumExists() + @DisplayName("An existing sum returns a non-null value, if calculated") + public void testCalculateReturnsNonNullValueIfSumExists() { AdderBusinessLogic adder = new AdderBusinessLogic(); - adder.startSum("foo"); - assertThat(adder.endSum("foo")).isNotNegative(); + adder.addToSum("foo", 6); + assertThat(adder.calculate("foo")).isNotNull(); } @Test @DisplayName("Ending a non-existing sum, causes an IllegalStateException") - public void testEndSumCausesExceptionIfNotExists() + public void testCalculateCausesExceptionIfNotExists() { AdderBusinessLogic adder = new AdderBusinessLogic(); - assertThatIllegalStateException().isThrownBy(() -> adder.endSum("foo")); + assertThatIllegalStateException().isThrownBy(() -> adder.calculate("foo")); } @Test - @DisplayName("Adding to a non-existent sum causes an IllegalStateException") - public void testAddToSumCausesExceptionIfNotExists() + @DisplayName("Adding a null-value to a sum causes an IllegalArgumentException") + public void testAddToSumWithNullValueCausesException() { AdderBusinessLogic adder = new AdderBusinessLogic(); - assertThatIllegalStateException().isThrownBy(() -> adder.addToSum("foo", 1)); - } - - @Test - @DisplayName("Adding a null-value to an existing sum causes an IllegalArgumentException") - public void testAddSumWithNullValueToExistingSumCausesException() - { - AdderBusinessLogic adder = new AdderBusinessLogic(); - adder.startSum("foo"); assertThatIllegalArgumentException().isThrownBy(() -> adder.addToSum("foo", null)); } @ParameterizedTest(name = "{index}: Adding {0}") - @DisplayName("Adding a non-positive value to an existing sum causes an IllegalArgumentException") + @DisplayName("Adding a non-positive value to a sum causes an IllegalArgumentException") @ValueSource(ints = { 0, -1, -6, -66, Integer.MIN_VALUE }) - public void testAddSumWithNonPositiveValueToExistingSumCausesException(int value) + public void testAddToSumWithNonPositiveValueCausesException(int value) { AdderBusinessLogic adder = new AdderBusinessLogic(); - adder.startSum("foo"); assertThatIllegalArgumentException().isThrownBy(() -> adder.addToSum("foo", value)); } - @Test - @DisplayName("Can add a positive value to an existing sum") - public void testAddSumWithPositiveValuePossibleIfSumExists() + @ParameterizedTest(name = "{index}: Adding {0}") + @DisplayName("Can add a positive value to a sum") + @ValueSource(ints = { 1, 3, 6, 66, 7, 9 }) + public void testAddToSumWithPositiveValuePossible(int value) { AdderBusinessLogic adder = new AdderBusinessLogic(); - adder.startSum("foo"); - assertThatIllegalArgumentException().isThrownBy(() -> adder.addToSum("foo", -1)); + assertThatNoException().isThrownBy(() -> adder.addToSum("foo", value)); } @ParameterizedTest(name = "{index}: Summing up {0}") @DisplayName("Adds up numbers correctly") @MethodSource("numbersProvider") - public void testAddSumAddsUpNumbersCorrectlyIfSumExists(int... numbers) + public void testAddToSumAddsUpNumbersCorrectlyIfSumExists(int... numbers) { long expectedResult = Arrays.stream(numbers).sum(); AdderBusinessLogic adder = new AdderBusinessLogic(); - adder.startSum("foo"); Arrays.stream(numbers).forEach(number -> adder.addToSum("foo", number)); - assertThat(adder.endSum("foo")).isEqualTo(expectedResult); + AdderResult result = adder.calculate("foo"); + assertThat(result.number).isEqualTo(numbers[numbers.length-1]); + assertThat(result.sum).isEqualTo(expectedResult); } static Stream numbersProvider() { diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 4ddf8a9..1336050 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -3,82 +3,122 @@ package de.juplo.kafka; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Bytes; +import org.springframework.beans.factory.annotation.Autowired; +import java.util.*; import java.util.function.Consumer; +import java.util.stream.Collectors; import java.util.stream.IntStream; +import static org.assertj.core.api.Assertions.assertThat; + public class ApplicationTests extends GenericApplicationTests { + @Autowired + AdderResults results; + + public ApplicationTests() { - super( - new RecordGenerator() + super(new ApplicationTestRecrodGenerator()); + ((ApplicationTestRecrodGenerator)recordGenerator).tests = this; + } + + + static class ApplicationTestRecrodGenerator implements RecordGenerator + { + ApplicationTests tests; + + final int[] numbers = {1, 7, 3, 2, 33, 6, 11}; + final String[] dieWilden13 = + IntStream + .range(1, 14) + .mapToObj(i -> "seeräuber-" + i) + .toArray(i -> new String[i]); + final StringSerializer stringSerializer = new StringSerializer(); + final Bytes calculateMessage = new Bytes(stringSerializer.serialize(TOPIC, "CALCULATE")); + + int counter = 0; + + Map> state; + + @Override + public int generate( + boolean poisonPills, + boolean logicErrors, + Consumer> messageSender) + { + counter = 0; + state = + Arrays + .stream(dieWilden13) + .collect(Collectors.toMap( + seeräuber -> seeräuber, + seeräuber -> new LinkedList())); + + for (int i = 0; i < 33; i++) + { + String seeräuber = dieWilden13[i % 13]; + int number = numbers[i % 7]; + + Bytes key = new Bytes(stringSerializer.serialize(TOPIC, seeräuber)); + + for (int message = 1; message <= number; message++) { - final int[] numbers = { 1, 7, 3, 2, 33, 6, 11 }; - final String[] dieWilden13 = - IntStream - .range(1,14) - .mapToObj(i -> "seeräuber-" + i) - .toArray(i -> new String[i]); - final StringSerializer stringSerializer = new StringSerializer(); - final Bytes startMessage = new Bytes(stringSerializer.serialize(TOPIC, "START")); - final Bytes endMessage = new Bytes(stringSerializer.serialize(TOPIC, "END")); - - int counter = 0; - - - @Override - public int generate( - boolean poisonPills, - boolean logicErrors, - Consumer> messageSender) - { - counter = 0; - - for (int i = 0; i < 33; i++) - { - String seeräuber = dieWilden13[i%13]; - int number = numbers[i%7]; - - Bytes key = new Bytes(stringSerializer.serialize(TOPIC, seeräuber)); - - send(key, startMessage, logicErrors, messageSender); - for (int message = 1; message <= number; message++) - { - Bytes value = new Bytes(stringSerializer.serialize(TOPIC, Integer.toString(message))); - send(key, value, logicErrors, messageSender); - } - send(key, endMessage, logicErrors, messageSender); - } - - return counter; - } - - void send( - Bytes key, - Bytes value, - boolean logicErrors, - Consumer> messageSender) - { - counter++; + Bytes value = new Bytes(stringSerializer.serialize(TOPIC, Integer.toString(message))); + send(key, value, logicErrors, messageSender); + } + send(key, calculateMessage, logicErrors, messageSender); + + state.get(seeräuber).add(new AdderResult(number, (number + 1) * number / 2)); + } - if (counter == 77) - { - if (logicErrors) - { - value = value.equals(startMessage) ? endMessage : startMessage; - } - } + return counter; + } - messageSender.accept(new ProducerRecord<>(TOPIC, key, value)); - } + void send( + Bytes key, + Bytes value, + boolean logicErrors, + Consumer> messageSender) + { + counter++; - @Override - public boolean canGeneratePoisonPill() + if (counter == 77) + { + if (logicErrors) + { + value = new Bytes(stringSerializer.serialize(TOPIC, Integer.toString(-1))); + } + } + + messageSender.accept(new ProducerRecord<>(TOPIC, key, value)); + } + + @Override + public boolean canGeneratePoisonPill() + { + return false; + } + + @Override + public void assertBusinessLogic() + { + tests.results + .getState() + .values() + .stream() + .flatMap(map -> map.entrySet().stream()) + .forEach(entry -> { - return false; - } - }); + String user = entry.getKey(); + List resultsForUser = entry.getValue(); + + assertThat(state.get(user)) + .describedAs("Unexpected results for user {}", user) + .containsExactlyElementsOf(resultsForUser); + }); + } } }