From: Kai Moritz Date: Sat, 19 Nov 2022 08:52:07 +0000 (+0100) Subject: Die App führt die Berechnung aus X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=refs%2Fheads%2Ferrorhandling%2Fspring-consumer--json--adder;p=demos%2Fkafka%2Ftraining Die App führt die Berechnung aus * Die App summiert die übergebenen Zahlen. * Bei einer CALC-Anweisung, wird die Summe berechnet und ausgegeben. * Nachrichten mit negativen Zahlen führen zu einem Business-Fehler. * Die README.sh führt den Fehler vor. --- diff --git a/README.sh b/README.sh index a5a4774..d63d229 100755 --- a/README.sh +++ b/README.sh @@ -39,10 +39,11 @@ echo 6 | http -v :8080/peter echo 77 | http -v :8080/klaus # end::nachrichten[] -echo "Writing poison pill..." -# tag::poisonpill[] -echo 'BOOM!' | kafkacat -P -b :9092 -t test -# end::poisonpill[] +echo "Writing logic error..." +# tag::logicerror[] +echo 66 | http -v :8080/peter?error=1 +# end::logicerror[] +echo 7 | http -v :8080/klaus docker-compose -f docker/docker-compose.yml logs -f consumer-1 consumer-2 diff --git a/src/main/java/de/juplo/kafka/AdderBusinessLogic.java b/src/main/java/de/juplo/kafka/AdderBusinessLogic.java new file mode 100644 index 0000000..c0c19a5 --- /dev/null +++ b/src/main/java/de/juplo/kafka/AdderBusinessLogic.java @@ -0,0 +1,50 @@ +package de.juplo.kafka; + + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + + +public class AdderBusinessLogic +{ + private final Map state; + + + public AdderBusinessLogic() + { + this(new HashMap<>()); + } + + public AdderBusinessLogic(Map state) + { + this.state = state; + } + + + public synchronized Optional getSum(String user) + { + return Optional.ofNullable(state.get(user)).map(result -> result.sum); + } + + public synchronized void addToSum(String user, Integer value) + { + if (value == null || value < 1) + throw new IllegalArgumentException("Not a positive number: " + value); + + long sum = + Optional + .ofNullable(state.get(user)) + .map(result -> result.sum) + .orElse(0l); + state.put(user, new AdderResult(value, sum + value)); + } + + public synchronized AdderResult calculate(String user) + { + if (!state.containsKey(user)) + throw new IllegalStateException("No sumation for " + user + " in progress"); + + return state.remove(user); + } +} diff --git a/src/main/java/de/juplo/kafka/AdderResult.java b/src/main/java/de/juplo/kafka/AdderResult.java new file mode 100644 index 0000000..44b7da8 --- /dev/null +++ b/src/main/java/de/juplo/kafka/AdderResult.java @@ -0,0 +1,21 @@ +package de.juplo.kafka; + +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.RequiredArgsConstructor; + + +@RequiredArgsConstructor +@Getter +@EqualsAndHashCode +public class AdderResult +{ + final int number; + final long sum; + + @Override + public String toString() + { + return "sum(" + number + ") = " + sum; + } +} diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index a8b3e1d..d292dbc 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -13,9 +13,26 @@ import org.springframework.kafka.core.ConsumerFactory; @EnableConfigurationProperties({ KafkaProperties.class, ApplicationProperties.class }) public class ApplicationConfiguration { + @Bean + public AdderBusinessLogic adder() + { + return new AdderBusinessLogic(); + } + + @Bean + public MessageHandler messageHandler( + KafkaProperties properties, + AdderBusinessLogic adder) + { + return new MessageHandler( + properties.getClientId(), + adder); + } + @Bean public SimpleConsumer simpleConsumer( Consumer kafkaConsumer, + MessageHandler messageHandler, KafkaProperties kafkaProperties, ApplicationProperties applicationProperties) { @@ -23,7 +40,8 @@ public class ApplicationConfiguration new SimpleConsumer( kafkaProperties.getClientId(), applicationProperties.getTopic(), - kafkaConsumer); + kafkaConsumer, + messageHandler); } @Bean diff --git a/src/main/java/de/juplo/kafka/ApplicationController.java b/src/main/java/de/juplo/kafka/ApplicationController.java new file mode 100644 index 0000000..79d42e9 --- /dev/null +++ b/src/main/java/de/juplo/kafka/ApplicationController.java @@ -0,0 +1,36 @@ +package de.juplo.kafka; + +import lombok.RequiredArgsConstructor; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.*; + +import java.util.List; +import java.util.Map; +import java.util.Optional; + + +@RestController +@RequiredArgsConstructor +public class ApplicationController +{ + private final AdderBusinessLogic adder; + + + @GetMapping("state/{user}") + public ResponseEntity state(@PathVariable String user) + { + return adder + .getSum(user) + .map(sum -> ResponseEntity.ok(sum)) + .orElseGet(() -> ResponseEntity.notFound().build()); + } + + + @ExceptionHandler + @ResponseStatus(HttpStatus.BAD_REQUEST) + public ErrorResponse illegalStateException(IllegalStateException e) + { + return new ErrorResponse(e.getMessage(), HttpStatus.BAD_REQUEST.value()); + } +} diff --git a/src/main/java/de/juplo/kafka/ErrorResponse.java b/src/main/java/de/juplo/kafka/ErrorResponse.java new file mode 100644 index 0000000..5ca206d --- /dev/null +++ b/src/main/java/de/juplo/kafka/ErrorResponse.java @@ -0,0 +1,11 @@ +package de.juplo.kafka; + +import lombok.Value; + + +@Value +public class ErrorResponse +{ + private final String error; + private final Integer status; +} diff --git a/src/main/java/de/juplo/kafka/MessageHandler.java b/src/main/java/de/juplo/kafka/MessageHandler.java new file mode 100644 index 0000000..2f58f65 --- /dev/null +++ b/src/main/java/de/juplo/kafka/MessageHandler.java @@ -0,0 +1,44 @@ +package de.juplo.kafka; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + + +@RequiredArgsConstructor +@Slf4j +public class MessageHandler +{ + private final String id; + + private final AdderBusinessLogic adder; + + + public void addNumber( + String user, + MessageAddNumber message) + { + adder.addToSum(user, message.getNext()); + } + + public void calculateSum( + String user, + MessageCalculateSum message) + { + AdderResult result = adder.calculate(user); + log.info("{} - New result for {}: {}", id, user, result); + } + + public void handle(String user, Message message) + { + switch(message.getType()) + { + case ADD: + addNumber(user, (MessageAddNumber) message); + break; + + case CALC: + calculateSum(user, (MessageCalculateSum) message); + break; + } + } +} diff --git a/src/main/java/de/juplo/kafka/SimpleConsumer.java b/src/main/java/de/juplo/kafka/SimpleConsumer.java index 45f9b94..cea9568 100644 --- a/src/main/java/de/juplo/kafka/SimpleConsumer.java +++ b/src/main/java/de/juplo/kafka/SimpleConsumer.java @@ -19,6 +19,7 @@ public class SimpleConsumer implements Callable private final String id; private final String topic; private final Consumer consumer; + private final MessageHandler messageHandler; private long consumed = 0; @@ -76,5 +77,6 @@ public class SimpleConsumer implements Callable { consumed++; log.info("{} - {}: {}/{} - {}={}", id, offset, topic, partition, key, value); + messageHandler.handle(key, value); } } diff --git a/src/test/java/de/juplo/kafka/AdderBusinessLogicTest.java b/src/test/java/de/juplo/kafka/AdderBusinessLogicTest.java new file mode 100644 index 0000000..8e49263 --- /dev/null +++ b/src/test/java/de/juplo/kafka/AdderBusinessLogicTest.java @@ -0,0 +1,117 @@ +package de.juplo.kafka; + +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; + +import java.util.Arrays; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +import static org.assertj.core.api.Assertions.*; + + +public class AdderBusinessLogicTest +{ + @Test + @DisplayName("An empty Optional should be returned, for a non-existing sum") + public void testGetSumReturnsEmptyOptionalForNonExistingSum() + { + AdderBusinessLogic adder = new AdderBusinessLogic(); + assertThat(adder.getSum("foo")).isEmpty(); + } + + @Test + @DisplayName("A non-empty Optional should be returned, for an existing sum") + public void testGetSumReturnsNonEmptyOptionalForExistingSum() + { + AdderBusinessLogic adder = new AdderBusinessLogic(); + adder.addToSum("foo", 6); + assertThat(adder.getSum("foo")).isNotEmpty(); + } + + @Test + @DisplayName("A sum can be calculated, if it does exist") + public void testCalculatePossibleIfSumExists() + { + AdderBusinessLogic adder = new AdderBusinessLogic(); + adder.addToSum("foo", 6); + assertThatNoException().isThrownBy(() -> adder.calculate("foo")); + } + + @Test + @DisplayName("An existing sum is removed, if ended") + public void testCalculateRemovesSumIfSumExists() + { + AdderBusinessLogic adder = new AdderBusinessLogic(); + adder.addToSum("foo", 6); + adder.calculate("foo"); + assertThat(adder.getSum("foo")).isEmpty(); + } + + @Test + @DisplayName("An existing sum returns a non-null value, if calculated") + public void testCalculateReturnsNonNullValueIfSumExists() + { + AdderBusinessLogic adder = new AdderBusinessLogic(); + adder.addToSum("foo", 6); + assertThat(adder.calculate("foo")).isNotNull(); + } + + @Test + @DisplayName("Ending a non-existing sum, causes an IllegalStateException") + public void testCalculateCausesExceptionIfNotExists() + { + AdderBusinessLogic adder = new AdderBusinessLogic(); + assertThatIllegalStateException().isThrownBy(() -> adder.calculate("foo")); + } + + @Test + @DisplayName("Adding a null-value to a sum causes an IllegalArgumentException") + public void testAddToSumWithNullValueCausesException() + { + AdderBusinessLogic adder = new AdderBusinessLogic(); + assertThatIllegalArgumentException().isThrownBy(() -> adder.addToSum("foo", null)); + } + + @ParameterizedTest(name = "{index}: Adding {0}") + @DisplayName("Adding a non-positive value to a sum causes an IllegalArgumentException") + @ValueSource(ints = { 0, -1, -6, -66, Integer.MIN_VALUE }) + public void testAddToSumWithNonPositiveValueCausesException(int value) + { + AdderBusinessLogic adder = new AdderBusinessLogic(); + assertThatIllegalArgumentException().isThrownBy(() -> adder.addToSum("foo", value)); + } + + @ParameterizedTest(name = "{index}: Adding {0}") + @DisplayName("Can add a positive value to a sum") + @ValueSource(ints = { 1, 3, 6, 66, 7, 9 }) + public void testAddToSumWithPositiveValuePossible(int value) + { + AdderBusinessLogic adder = new AdderBusinessLogic(); + assertThatNoException().isThrownBy(() -> adder.addToSum("foo", value)); + } + + @ParameterizedTest(name = "{index}: Summing up {0}") + @DisplayName("Adds up numbers correctly") + @MethodSource("numbersProvider") + public void testAddToSumAddsUpNumbersCorrectlyIfSumExists(int... numbers) + { + long expectedResult = Arrays.stream(numbers).sum(); + AdderBusinessLogic adder = new AdderBusinessLogic(); + Arrays.stream(numbers).forEach(number -> adder.addToSum("foo", number)); + AdderResult result = adder.calculate("foo"); + assertThat(result.number).isEqualTo(numbers[numbers.length-1]); + assertThat(result.sum).isEqualTo(expectedResult); + } + + static Stream numbersProvider() { + return Stream.of( + Arguments.of((Object) IntStream.rangeClosed(1,9).toArray()), + Arguments.of((Object) IntStream.rangeClosed(1,19).toArray()), + Arguments.of((Object) IntStream.rangeClosed(1,66).toArray())); + } +}