* Die App summiert die übergebenen Zahlen.
* Bei einer CALC-Anweisung, wird die Summe berechnet und ausgegeben.
* Nachrichten mit negativen Zahlen führen zu einem Business-Fehler.
* Die README.sh führt den Fehler vor.
echo 77 | http -v :8080/klaus
# end::nachrichten[]
-echo "Writing poison pill..."
-# tag::poisonpill[]
-echo 'BOOM!' | kafkacat -P -b :9092 -t test
-# end::poisonpill[]
+echo "Writing logic error..."
+# tag::logicerror[]
+echo 66 | http -v :8080/peter?error=1
+# end::logicerror[]
+echo 7 | http -v :8080/klaus
docker-compose -f docker/docker-compose.yml logs -f consumer-1 consumer-2
--- /dev/null
+package de.juplo.kafka;
+
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+
+public class AdderBusinessLogic
+{
+ private final Map<String, AdderResult> state;
+
+
+ public AdderBusinessLogic()
+ {
+ this(new HashMap<>());
+ }
+
+ public AdderBusinessLogic(Map<String, AdderResult> state)
+ {
+ this.state = state;
+ }
+
+
+ public synchronized Optional<Long> getSum(String user)
+ {
+ return Optional.ofNullable(state.get(user)).map(result -> result.sum);
+ }
+
+ public synchronized void addToSum(String user, Integer value)
+ {
+ if (value == null || value < 1)
+ throw new IllegalArgumentException("Not a positive number: " + value);
+
+ long sum =
+ Optional
+ .ofNullable(state.get(user))
+ .map(result -> result.sum)
+ .orElse(0l);
+ state.put(user, new AdderResult(value, sum + value));
+ }
+
+ public synchronized AdderResult calculate(String user)
+ {
+ if (!state.containsKey(user))
+ throw new IllegalStateException("No sumation for " + user + " in progress");
+
+ return state.remove(user);
+ }
+}
--- /dev/null
+package de.juplo.kafka;
+
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
+
+@RequiredArgsConstructor
+@Getter
+@EqualsAndHashCode
+public class AdderResult
+{
+ final int number;
+ final long sum;
+
+ @Override
+ public String toString()
+ {
+ return "sum(" + number + ") = " + sum;
+ }
+}
@EnableConfigurationProperties({ KafkaProperties.class, ApplicationProperties.class })
public class ApplicationConfiguration
{
+ @Bean
+ public AdderBusinessLogic adder()
+ {
+ return new AdderBusinessLogic();
+ }
+
+ @Bean
+ public MessageHandler messageHandler(
+ KafkaProperties properties,
+ AdderBusinessLogic adder)
+ {
+ return new MessageHandler(
+ properties.getClientId(),
+ adder);
+ }
+
@Bean
public SimpleConsumer simpleConsumer(
Consumer<String, Message> kafkaConsumer,
+ MessageHandler messageHandler,
KafkaProperties kafkaProperties,
ApplicationProperties applicationProperties)
{
new SimpleConsumer(
kafkaProperties.getClientId(),
applicationProperties.getTopic(),
- kafkaConsumer);
+ kafkaConsumer,
+ messageHandler);
}
@Bean
--- /dev/null
+package de.juplo.kafka;
+
+import lombok.RequiredArgsConstructor;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.*;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+
+@RestController
+@RequiredArgsConstructor
+public class ApplicationController
+{
+ private final AdderBusinessLogic adder;
+
+
+ @GetMapping("state/{user}")
+ public ResponseEntity<Long> state(@PathVariable String user)
+ {
+ return adder
+ .getSum(user)
+ .map(sum -> ResponseEntity.ok(sum))
+ .orElseGet(() -> ResponseEntity.notFound().build());
+ }
+
+
+ @ExceptionHandler
+ @ResponseStatus(HttpStatus.BAD_REQUEST)
+ public ErrorResponse illegalStateException(IllegalStateException e)
+ {
+ return new ErrorResponse(e.getMessage(), HttpStatus.BAD_REQUEST.value());
+ }
+}
--- /dev/null
+package de.juplo.kafka;
+
+import lombok.Value;
+
+
+@Value
+public class ErrorResponse
+{
+ private final String error;
+ private final Integer status;
+}
--- /dev/null
+package de.juplo.kafka;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+
+@RequiredArgsConstructor
+@Slf4j
+public class MessageHandler
+{
+ private final String id;
+
+ private final AdderBusinessLogic adder;
+
+
+ public void addNumber(
+ String user,
+ MessageAddNumber message)
+ {
+ adder.addToSum(user, message.getNext());
+ }
+
+ public void calculateSum(
+ String user,
+ MessageCalculateSum message)
+ {
+ AdderResult result = adder.calculate(user);
+ log.info("{} - New result for {}: {}", id, user, result);
+ }
+
+ public void handle(String user, Message message)
+ {
+ switch(message.getType())
+ {
+ case ADD:
+ addNumber(user, (MessageAddNumber) message);
+ break;
+
+ case CALC:
+ calculateSum(user, (MessageCalculateSum) message);
+ break;
+ }
+ }
+}
private final String id;
private final String topic;
private final Consumer<String, Message> consumer;
+ private final MessageHandler messageHandler;
private long consumed = 0;
{
consumed++;
log.info("{} - {}: {}/{} - {}={}", id, offset, topic, partition, key, value);
+ messageHandler.handle(key, value);
}
}
--- /dev/null
+package de.juplo.kafka;
+
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.Arrays;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.*;
+
+
+public class AdderBusinessLogicTest
+{
+ @Test
+ @DisplayName("An empty Optional should be returned, for a non-existing sum")
+ public void testGetSumReturnsEmptyOptionalForNonExistingSum()
+ {
+ AdderBusinessLogic adder = new AdderBusinessLogic();
+ assertThat(adder.getSum("foo")).isEmpty();
+ }
+
+ @Test
+ @DisplayName("A non-empty Optional should be returned, for an existing sum")
+ public void testGetSumReturnsNonEmptyOptionalForExistingSum()
+ {
+ AdderBusinessLogic adder = new AdderBusinessLogic();
+ adder.addToSum("foo", 6);
+ assertThat(adder.getSum("foo")).isNotEmpty();
+ }
+
+ @Test
+ @DisplayName("A sum can be calculated, if it does exist")
+ public void testCalculatePossibleIfSumExists()
+ {
+ AdderBusinessLogic adder = new AdderBusinessLogic();
+ adder.addToSum("foo", 6);
+ assertThatNoException().isThrownBy(() -> adder.calculate("foo"));
+ }
+
+ @Test
+ @DisplayName("An existing sum is removed, if ended")
+ public void testCalculateRemovesSumIfSumExists()
+ {
+ AdderBusinessLogic adder = new AdderBusinessLogic();
+ adder.addToSum("foo", 6);
+ adder.calculate("foo");
+ assertThat(adder.getSum("foo")).isEmpty();
+ }
+
+ @Test
+ @DisplayName("An existing sum returns a non-null value, if calculated")
+ public void testCalculateReturnsNonNullValueIfSumExists()
+ {
+ AdderBusinessLogic adder = new AdderBusinessLogic();
+ adder.addToSum("foo", 6);
+ assertThat(adder.calculate("foo")).isNotNull();
+ }
+
+ @Test
+ @DisplayName("Ending a non-existing sum, causes an IllegalStateException")
+ public void testCalculateCausesExceptionIfNotExists()
+ {
+ AdderBusinessLogic adder = new AdderBusinessLogic();
+ assertThatIllegalStateException().isThrownBy(() -> adder.calculate("foo"));
+ }
+
+ @Test
+ @DisplayName("Adding a null-value to a sum causes an IllegalArgumentException")
+ public void testAddToSumWithNullValueCausesException()
+ {
+ AdderBusinessLogic adder = new AdderBusinessLogic();
+ assertThatIllegalArgumentException().isThrownBy(() -> adder.addToSum("foo", null));
+ }
+
+ @ParameterizedTest(name = "{index}: Adding {0}")
+ @DisplayName("Adding a non-positive value to a sum causes an IllegalArgumentException")
+ @ValueSource(ints = { 0, -1, -6, -66, Integer.MIN_VALUE })
+ public void testAddToSumWithNonPositiveValueCausesException(int value)
+ {
+ AdderBusinessLogic adder = new AdderBusinessLogic();
+ assertThatIllegalArgumentException().isThrownBy(() -> adder.addToSum("foo", value));
+ }
+
+ @ParameterizedTest(name = "{index}: Adding {0}")
+ @DisplayName("Can add a positive value to a sum")
+ @ValueSource(ints = { 1, 3, 6, 66, 7, 9 })
+ public void testAddToSumWithPositiveValuePossible(int value)
+ {
+ AdderBusinessLogic adder = new AdderBusinessLogic();
+ assertThatNoException().isThrownBy(() -> adder.addToSum("foo", value));
+ }
+
+ @ParameterizedTest(name = "{index}: Summing up {0}")
+ @DisplayName("Adds up numbers correctly")
+ @MethodSource("numbersProvider")
+ public void testAddToSumAddsUpNumbersCorrectlyIfSumExists(int... numbers)
+ {
+ long expectedResult = Arrays.stream(numbers).sum();
+ AdderBusinessLogic adder = new AdderBusinessLogic();
+ Arrays.stream(numbers).forEach(number -> adder.addToSum("foo", number));
+ AdderResult result = adder.calculate("foo");
+ assertThat(result.number).isEqualTo(numbers[numbers.length-1]);
+ assertThat(result.sum).isEqualTo(expectedResult);
+ }
+
+ static Stream<Arguments> numbersProvider() {
+ return Stream.of(
+ Arguments.of((Object) IntStream.rangeClosed(1,9).toArray()),
+ Arguments.of((Object) IntStream.rangeClosed(1,19).toArray()),
+ Arguments.of((Object) IntStream.rangeClosed(1,66).toArray()));
+ }
+}