Die App führt die Berechnung aus errorhandling/spring-consumer--json--adder---2023-06-signal
authorKai Moritz <kai@juplo.de>
Sat, 19 Nov 2022 08:52:07 +0000 (09:52 +0100)
committerKai Moritz <kai@juplo.de>
Thu, 15 Jun 2023 22:52:13 +0000 (00:52 +0200)
* Die App summiert die übergebenen Zahlen.
* Bei einer CALC-Anweisung, wird die Summe berechnet und ausgegeben.
* Nachrichten mit negativen Zahlen führen zu einem Business-Fehler.
* Die README.sh führt den Fehler vor.

README.sh
src/main/java/de/juplo/kafka/AdderBusinessLogic.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/AdderResult.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/ApplicationController.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/ErrorResponse.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/MessageHandler.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/SimpleConsumer.java
src/test/java/de/juplo/kafka/AdderBusinessLogicTest.java [new file with mode: 0644]

index a5a4774..d63d229 100755 (executable)
--- a/README.sh
+++ b/README.sh
@@ -39,10 +39,11 @@ echo 6  | http -v :8080/peter
 echo 77 | http -v :8080/klaus
 # end::nachrichten[]
 
-echo "Writing poison pill..."
-# tag::poisonpill[]
-echo 'BOOM!' | kafkacat -P -b :9092 -t test
-# end::poisonpill[]
+echo "Writing logic error..."
+# tag::logicerror[]
+echo 66 | http -v :8080/peter?error=1
+# end::logicerror[]
+echo 7  | http -v :8080/klaus
 
 docker-compose -f docker/docker-compose.yml logs -f consumer-1 consumer-2
 
diff --git a/src/main/java/de/juplo/kafka/AdderBusinessLogic.java b/src/main/java/de/juplo/kafka/AdderBusinessLogic.java
new file mode 100644 (file)
index 0000000..c0c19a5
--- /dev/null
@@ -0,0 +1,50 @@
+package de.juplo.kafka;
+
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+
+public class AdderBusinessLogic
+{
+  private final Map<String, AdderResult> state;
+
+
+  public AdderBusinessLogic()
+  {
+    this(new HashMap<>());
+  }
+
+  public AdderBusinessLogic(Map<String, AdderResult> state)
+  {
+    this.state = state;
+  }
+
+
+  public synchronized Optional<Long> getSum(String user)
+  {
+    return Optional.ofNullable(state.get(user)).map(result -> result.sum);
+  }
+
+  public synchronized void addToSum(String user, Integer value)
+  {
+    if (value == null || value < 1)
+      throw new IllegalArgumentException("Not a positive number: " + value);
+
+    long sum =
+        Optional
+            .ofNullable(state.get(user))
+            .map(result -> result.sum)
+            .orElse(0l);
+    state.put(user, new AdderResult(value, sum + value));
+  }
+
+  public synchronized AdderResult calculate(String user)
+  {
+    if (!state.containsKey(user))
+      throw new IllegalStateException("No sumation for " + user + " in progress");
+
+    return state.remove(user);
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/AdderResult.java b/src/main/java/de/juplo/kafka/AdderResult.java
new file mode 100644 (file)
index 0000000..44b7da8
--- /dev/null
@@ -0,0 +1,21 @@
+package de.juplo.kafka;
+
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
+
+@RequiredArgsConstructor
+@Getter
+@EqualsAndHashCode
+public class AdderResult
+{
+  final int number;
+  final long sum;
+
+  @Override
+  public String toString()
+  {
+    return "sum(" + number + ") = " + sum;
+  }
+}
index a8b3e1d..d292dbc 100644 (file)
@@ -13,9 +13,26 @@ import org.springframework.kafka.core.ConsumerFactory;
 @EnableConfigurationProperties({ KafkaProperties.class, ApplicationProperties.class })
 public class ApplicationConfiguration
 {
+  @Bean
+  public AdderBusinessLogic adder()
+  {
+    return new AdderBusinessLogic();
+  }
+
+  @Bean
+  public MessageHandler messageHandler(
+    KafkaProperties properties,
+    AdderBusinessLogic adder)
+  {
+    return new MessageHandler(
+      properties.getClientId(),
+      adder);
+  }
+
   @Bean
   public SimpleConsumer simpleConsumer(
       Consumer<String, Message> kafkaConsumer,
+      MessageHandler messageHandler,
       KafkaProperties kafkaProperties,
       ApplicationProperties applicationProperties)
   {
@@ -23,7 +40,8 @@ public class ApplicationConfiguration
         new SimpleConsumer(
             kafkaProperties.getClientId(),
             applicationProperties.getTopic(),
-            kafkaConsumer);
+            kafkaConsumer,
+            messageHandler);
   }
 
   @Bean
diff --git a/src/main/java/de/juplo/kafka/ApplicationController.java b/src/main/java/de/juplo/kafka/ApplicationController.java
new file mode 100644 (file)
index 0000000..79d42e9
--- /dev/null
@@ -0,0 +1,36 @@
+package de.juplo.kafka;
+
+import lombok.RequiredArgsConstructor;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.*;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+
+@RestController
+@RequiredArgsConstructor
+public class ApplicationController
+{
+  private final AdderBusinessLogic adder;
+
+
+  @GetMapping("state/{user}")
+  public ResponseEntity<Long> state(@PathVariable String user)
+  {
+    return adder
+      .getSum(user)
+      .map(sum -> ResponseEntity.ok(sum))
+      .orElseGet(() -> ResponseEntity.notFound().build());
+  }
+
+
+  @ExceptionHandler
+  @ResponseStatus(HttpStatus.BAD_REQUEST)
+  public ErrorResponse illegalStateException(IllegalStateException e)
+  {
+    return new ErrorResponse(e.getMessage(), HttpStatus.BAD_REQUEST.value());
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/ErrorResponse.java b/src/main/java/de/juplo/kafka/ErrorResponse.java
new file mode 100644 (file)
index 0000000..5ca206d
--- /dev/null
@@ -0,0 +1,11 @@
+package de.juplo.kafka;
+
+import lombok.Value;
+
+
+@Value
+public class ErrorResponse
+{
+  private final String error;
+  private final Integer status;
+}
diff --git a/src/main/java/de/juplo/kafka/MessageHandler.java b/src/main/java/de/juplo/kafka/MessageHandler.java
new file mode 100644 (file)
index 0000000..2f58f65
--- /dev/null
@@ -0,0 +1,44 @@
+package de.juplo.kafka;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+
+@RequiredArgsConstructor
+@Slf4j
+public class MessageHandler
+{
+  private final String id;
+
+  private final AdderBusinessLogic adder;
+
+
+  public void addNumber(
+      String user,
+      MessageAddNumber message)
+  {
+    adder.addToSum(user, message.getNext());
+  }
+
+  public void calculateSum(
+      String user,
+      MessageCalculateSum message)
+  {
+    AdderResult result = adder.calculate(user);
+    log.info("{} - New result for {}: {}", id, user, result);
+  }
+
+  public void handle(String user, Message message)
+  {
+    switch(message.getType())
+    {
+      case ADD:
+        addNumber(user, (MessageAddNumber) message);
+        break;
+
+      case CALC:
+        calculateSum(user, (MessageCalculateSum) message);
+        break;
+    }
+  }
+}
index 45f9b94..cea9568 100644 (file)
@@ -19,6 +19,7 @@ public class SimpleConsumer implements Callable<Integer>
   private final String id;
   private final String topic;
   private final Consumer<String, Message> consumer;
+  private final MessageHandler messageHandler;
 
   private long consumed = 0;
 
@@ -76,5 +77,6 @@ public class SimpleConsumer implements Callable<Integer>
   {
     consumed++;
     log.info("{} - {}: {}/{} - {}={}", id, offset, topic, partition, key, value);
+    messageHandler.handle(key, value);
   }
 }
diff --git a/src/test/java/de/juplo/kafka/AdderBusinessLogicTest.java b/src/test/java/de/juplo/kafka/AdderBusinessLogicTest.java
new file mode 100644 (file)
index 0000000..8e49263
--- /dev/null
@@ -0,0 +1,117 @@
+package de.juplo.kafka;
+
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.Arrays;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.*;
+
+
+public class AdderBusinessLogicTest
+{
+  @Test
+  @DisplayName("An empty Optional should be returned, for a non-existing sum")
+  public void testGetSumReturnsEmptyOptionalForNonExistingSum()
+  {
+    AdderBusinessLogic adder = new AdderBusinessLogic();
+    assertThat(adder.getSum("foo")).isEmpty();
+  }
+
+  @Test
+  @DisplayName("A non-empty Optional should be returned, for an existing sum")
+  public void testGetSumReturnsNonEmptyOptionalForExistingSum()
+  {
+    AdderBusinessLogic adder = new AdderBusinessLogic();
+    adder.addToSum("foo", 6);
+    assertThat(adder.getSum("foo")).isNotEmpty();
+  }
+
+  @Test
+  @DisplayName("A sum can be calculated, if it does exist")
+  public void testCalculatePossibleIfSumExists()
+  {
+    AdderBusinessLogic adder = new AdderBusinessLogic();
+    adder.addToSum("foo", 6);
+    assertThatNoException().isThrownBy(() -> adder.calculate("foo"));
+  }
+
+  @Test
+  @DisplayName("An existing sum is removed, if ended")
+  public void testCalculateRemovesSumIfSumExists()
+  {
+    AdderBusinessLogic adder = new AdderBusinessLogic();
+    adder.addToSum("foo", 6);
+    adder.calculate("foo");
+    assertThat(adder.getSum("foo")).isEmpty();
+  }
+
+  @Test
+  @DisplayName("An existing sum returns a non-null value, if calculated")
+  public void testCalculateReturnsNonNullValueIfSumExists()
+  {
+    AdderBusinessLogic adder = new AdderBusinessLogic();
+    adder.addToSum("foo", 6);
+    assertThat(adder.calculate("foo")).isNotNull();
+  }
+
+  @Test
+  @DisplayName("Ending a non-existing sum, causes an IllegalStateException")
+  public void testCalculateCausesExceptionIfNotExists()
+  {
+    AdderBusinessLogic adder = new AdderBusinessLogic();
+    assertThatIllegalStateException().isThrownBy(() -> adder.calculate("foo"));
+  }
+
+  @Test
+  @DisplayName("Adding a null-value to a sum causes an IllegalArgumentException")
+  public void testAddToSumWithNullValueCausesException()
+  {
+    AdderBusinessLogic adder = new AdderBusinessLogic();
+    assertThatIllegalArgumentException().isThrownBy(() -> adder.addToSum("foo", null));
+  }
+
+  @ParameterizedTest(name = "{index}: Adding {0}")
+  @DisplayName("Adding a non-positive value to a sum causes an IllegalArgumentException")
+  @ValueSource(ints = { 0, -1, -6, -66, Integer.MIN_VALUE })
+  public void testAddToSumWithNonPositiveValueCausesException(int value)
+  {
+    AdderBusinessLogic adder = new AdderBusinessLogic();
+    assertThatIllegalArgumentException().isThrownBy(() -> adder.addToSum("foo", value));
+  }
+
+  @ParameterizedTest(name = "{index}: Adding {0}")
+  @DisplayName("Can add a positive value to a sum")
+  @ValueSource(ints = { 1, 3, 6, 66, 7, 9 })
+  public void testAddToSumWithPositiveValuePossible(int value)
+  {
+    AdderBusinessLogic adder = new AdderBusinessLogic();
+    assertThatNoException().isThrownBy(() -> adder.addToSum("foo", value));
+  }
+
+  @ParameterizedTest(name = "{index}: Summing up {0}")
+  @DisplayName("Adds up numbers correctly")
+  @MethodSource("numbersProvider")
+  public void testAddToSumAddsUpNumbersCorrectlyIfSumExists(int... numbers)
+  {
+    long expectedResult = Arrays.stream(numbers).sum();
+    AdderBusinessLogic adder = new AdderBusinessLogic();
+    Arrays.stream(numbers).forEach(number -> adder.addToSum("foo", number));
+    AdderResult result = adder.calculate("foo");
+    assertThat(result.number).isEqualTo(numbers[numbers.length-1]);
+    assertThat(result.sum).isEqualTo(expectedResult);
+  }
+
+  static Stream<Arguments> numbersProvider() {
+    return Stream.of(
+        Arguments.of((Object) IntStream.rangeClosed(1,9).toArray()),
+        Arguments.of((Object) IntStream.rangeClosed(1,19).toArray()),
+        Arguments.of((Object) IntStream.rangeClosed(1,66).toArray()));
+  }
+}