Merge branch 'deserialization' into sumup-adder--ohne--stored-offsets wip-merge-deserialization--sumup-adder--ohne-stored-offsets
authorKai Moritz <kai@juplo.de>
Fri, 9 Sep 2022 11:43:58 +0000 (13:43 +0200)
committerKai Moritz <kai@juplo.de>
Fri, 9 Sep 2022 11:43:58 +0000 (13:43 +0200)
* Conflicts:
** src/main/java/de/juplo/kafka/ApplicationConfiguration.java
** src/main/java/de/juplo/kafka/EndlessConsumer.java
** src/test/java/de/juplo/kafka/ApplicationIT.java
** src/test/java/de/juplo/kafka/ApplicationTests.java
** src/test/java/de/juplo/kafka/GenericApplicationTests.java
** src/test/java/de/juplo/kafka/TestRecordHandler.java

22 files changed:
README.sh
docker-compose.yml
pom.xml
src/main/java/de/juplo/kafka/AdderBusinessLogic.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/AdderResult.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/AdderResults.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/Application.java
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java
src/main/java/de/juplo/kafka/ApplicationProperties.java
src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/ApplicationRecordHandler.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/DriverController.java
src/main/java/de/juplo/kafka/EndlessConsumer.java
src/main/java/de/juplo/kafka/RebalanceListener.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/StateDocument.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/StateRepository.java [new file with mode: 0644]
src/main/resources/application.yml
src/test/java/de/juplo/kafka/AdderBusinessLogicTest.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/ApplicationIT.java
src/test/java/de/juplo/kafka/ApplicationTests.java
src/test/java/de/juplo/kafka/GenericApplicationTests.java

index 2a1e5d8..6be4b11 100755 (executable)
--- a/README.sh
+++ b/README.sh
@@ -1,6 +1,6 @@
 #!/bin/bash
 
-IMAGE=juplo/endless-consumer:1.0-SNAPSHOT
+IMAGE=juplo/sumup-adder:1.0-SNAPSHOT
 
 if [ "$1" = "cleanup" ]
 then
@@ -9,14 +9,17 @@ then
   exit
 fi
 
-docker-compose up -d zookeeper kafka cli
+docker-compose rm -svf adder-1 adder-2
+docker-compose rm -svf mongo
+docker-compose up -d zookeeper kafka-1 kafka-2 kafka-3 cli mongo express
 
 if [[
   $(docker image ls -q $IMAGE) == "" ||
   "$1" = "build"
 ]]
 then
-  mvn install || exit
+  docker-compose rm -svf adder-1 adder-2
+  mvn -D skipTests clean install || exit
 else
   echo "Using image existing images:"
   docker image ls $IMAGE
@@ -25,44 +28,89 @@ fi
 echo "Waiting for the Kafka-Cluster to become ready..."
 docker-compose exec cli cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1
 docker-compose up setup
-docker-compose up -d
+docker-compose up -d gateway requests-1 requests-2 adder-1 adder-2
 
-while ! [[ $(http 0:8081/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for consumer-1..."; sleep 1; done
-while ! [[ $(http 0:8082/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for consumer-2..."; sleep 1; done
-while ! [[ $(http 0:8083/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for consumer-3..."; sleep 1; done
-while ! [[ $(http 0:8084/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for consumer-4..."; sleep 1; done
-while ! [[ $(http 0:8085/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for consumer-5..."; sleep 1; done
+while ! [[ $(http 0:8080/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for gateway..."; sleep 1; done
+while ! [[ $(http 0:8081/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for requests-1..."; sleep 1; done
+while ! [[ $(http 0:8082/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for requests-2..."; sleep 1; done
+while ! [[ $(http 0:8091/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for adder-1..."; sleep 1; done
+while ! [[ $(http 0:8092/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for adder-2..."; sleep 1; done
 
-sleep 5
+docker-compose up -d peter klaus
 
-docker-compose exec -T cli bash << 'EOF'
-echo "Writing poison pill into topic test..."
-# tag::poisonpill[]
-echo 'BOOM!' | kafkacat -P -b kafka:9092 -t test
-# end::poisonpill[]
-EOF
+while [[ "$(http :8091/results | jq -r .)" == "{}" ]]; do echo "Waiting for some results to show up on adder-1..."; sleep 1; done
+http -v --pretty none -S :8091/results
+echo
+while [[ "$(http :8092/results | jq -r .)" == "{}" ]]; do echo "Waiting for some results to show up on adder-2..."; sleep 1; done
+http -v --pretty none -S :8092/results
+echo
+sleep 3
+echo "Resultate für adder-1"
+http -v --pretty none -S :8091/results
+echo
+echo "Resultate für peter von adder-1"
+http :8091/results/peter | jq .[].sum | uniq
+echo "Resultate für klaus von adder-1"
+http :8091/results/klaus | jq .[].sum | uniq
+echo "Resultate für adder-2"
+http -v --pretty none -S :8092/results
+echo
+echo "Resultate für peter von adder-2"
+http :8092/results/peter | jq .[].sum | uniq
+echo "Resultate für klaus von adder-2"
+http :8092/results/klaus | jq .[].sum | uniq
 
-while [[ $(http 0:8081/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "consumer-1 is still running..."; sleep 1; done
-while [[ $(http 0:8082/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "consumer-2 is still running..."; sleep 1; done
-while [[ $(http 0:8083/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "consumer-3 is still running..."; sleep 1; done
-while [[ $(http 0:8084/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "consumer-4 is still running..."; sleep 1; done
-while [[ $(http 0:8085/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "consumer-5 is still running..."; sleep 1; done
+docker-compose stop adder-1
+sleep 1
+echo "Resultate für adder-2"
+http -v --pretty none -S :8092/results
+echo
+echo "Resultate für peter von adder-2"
+http :8092/results/peter | jq .[].sum | uniq
+echo "Resultate für klaus von adder-2"
+http :8092/results/klaus | jq .[].sum | uniq
 
-http -v :8081/actuator/health
-echo "Restarting consumer-1"
-http -v post :8081/start
+docker-compose stop adder-2
+docker-compose start adder-1
+while ! [[ $(http 0:8091/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for adder-1..."; sleep 1; done
+while [[ "$(http :8091/results | jq -r .)" == "{}" ]]; do echo "Waiting for some results to show up on adder-1..."; sleep 1; done
+echo "Resultate für adder-1"
+http -v --pretty none -S :8091/results
+echo
+echo "Resultate für peter von adder-1"
+http :8091/results/peter | jq .[].sum | uniq
+echo "Resultate für klaus von adder-1"
+http :8091/results/klaus | jq .[].sum | uniq
 
-echo "Waiting for consumer-1 to come up"
-while ! [[ $(http 0:8081/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for consumer-1..."; sleep 1; done
-http -v :8081/actuator/health
+docker-compose start adder-2
+while ! [[ $(http 0:8092/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for adder-2..."; sleep 1; done
+while [[ "$(http :8092/results | jq -r .)" == "{}" ]]; do echo "Waiting for some results to show up on adder-2..."; sleep 1; done
+echo "Resultate für adder-1"
+http -v --pretty none -S :8091/results
+echo
+echo "Resultate für adder-2"
+http -v --pretty none -S :8092/results
+echo
+echo "Resultate für peter von adder-1"
+http :8091/results/peter | jq .[].sum | uniq
+echo "Resultate für klaus von adder-1"
+http :8091/results/klaus | jq .[].sum | uniq
+echo "Resultate für peter von adder-2"
+http :8092/results/peter | jq .[].sum | uniq
+echo "Resultate für klaus von adder-2"
+http :8092/results/klaus | jq .[].sum | uniq
 
-echo "Waiting for consumer-1 to crash"
-while [[ $(http 0:8081/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "consumer-1 is still running..."; sleep 1; done
-http -v :8081/actuator/health
+docker-compose kill -s 9 adder-1
 
-docker-compose stop producer
-docker-compose logs --tail=10 consumer-1
-docker-compose logs --tail=10 consumer-2
-docker-compose logs --tail=10 consumer-3
-docker-compose logs --tail=10 consumer-4
-docker-compose logs --tail=10 consumer-5
+docker-compose start adder-1
+while ! [[ $(http 0:8091/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for adder-1..."; sleep 1; done
+while [[ "$(http :8091/results | jq -r .)" == "{}" ]]; do echo "Waiting for some results to show up on adder-1..."; sleep 1; done
+docker-compose kill -s 9 peter klaus
+echo "Resultate für peter von adder-1"
+http :8091/results/peter | jq .[].sum | uniq
+echo "Resultate für klaus von adder-1"
+http :8091/results/klaus | jq .[].sum | uniq
+echo "Resultate für peter von adder-2"
+http :8092/results/peter | jq .[].sum | uniq
+echo "Resultate für klaus von adder-2"
+http :8092/results/klaus | jq .[].sum | uniq
index d36e851..c46b00d 100644 (file)
@@ -7,88 +7,167 @@ services:
     ports:
       - 2181:2181
 
-  kafka:
+  kafka-1:
     image: confluentinc/cp-kafka:7.1.3
     environment:
       KAFKA_BROKER_ID: 1
       KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+      KAFKA_LISTENERS: DOCKER://:9092, LOCALHOST://:9081
+      KAFKA_ADVERTISED_LISTENERS: DOCKER://kafka-1:9092, LOCALHOST://localhost:9081
+      KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER
+      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER:PLAINTEXT, LOCALHOST:PLAINTEXT
+      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
+      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
+    ports:
+      - 9081:9081
+    depends_on:
+      - zookeeper
+
+  kafka-2:
+    image: confluentinc/cp-kafka:7.1.3
+    environment:
+      KAFKA_BROKER_ID: 2
+      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
       KAFKA_LISTENERS: DOCKER://:9092, LOCALHOST://:9082
-      KAFKA_ADVERTISED_LISTENERS: DOCKER://kafka:9092, LOCALHOST://localhost:9082
+      KAFKA_ADVERTISED_LISTENERS: DOCKER://kafka-2:9092, LOCALHOST://localhost:9082
       KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER
       KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER:PLAINTEXT, LOCALHOST:PLAINTEXT
-      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
+      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
       KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
     ports:
       - 9092:9082
       - 9082:9082
+    networks:
+      default:
+        aliases:
+          - kafka
+    depends_on:
+      - zookeeper
+
+  kafka-3:
+    image: confluentinc/cp-kafka:7.1.3
+    environment:
+      KAFKA_BROKER_ID: 3
+      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+      KAFKA_LISTENERS: DOCKER://:9092, LOCALHOST://:9083
+      KAFKA_ADVERTISED_LISTENERS: DOCKER://kafka-3:9092, LOCALHOST://localhost:9083
+      KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER
+      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER:PLAINTEXT, LOCALHOST:PLAINTEXT
+      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
+      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
+    ports:
+      - 9083:9083
     depends_on:
       - zookeeper
 
+  mongo:
+    image: mongo:4.4.13
+    ports:
+      - 27017:27017
+    environment:
+      MONGO_INITDB_ROOT_USERNAME: juplo
+      MONGO_INITDB_ROOT_PASSWORD: training
+
+  express:
+    image: mongo-express
+    ports:
+      - 8090:8081
+    environment:
+      ME_CONFIG_MONGODB_ADMINUSERNAME: juplo
+      ME_CONFIG_MONGODB_ADMINPASSWORD: training
+      ME_CONFIG_MONGODB_URL: mongodb://juplo:training@mongo:27017/
+    depends_on:
+      - mongo
+
   setup:
     image: juplo/toolbox
     command: >
       bash -c "
-        kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic test
-        kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 2
+        kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic in
+        kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic out
+        kafka-topics --bootstrap-server kafka:9092 --create --topic in --partitions 2 --replication-factor 3 --config min.insync.replicas=2
+        kafka-topics --bootstrap-server kafka:9092 --create --topic out --partitions 2 --replication-factor 3 --config min.insync.replicas=2  
+        kafka-topics --bootstrap-server kafka:9092 --describe --topic in
+        kafka-topics --bootstrap-server kafka:9092 --describe --topic out
       "
 
   cli:
     image: juplo/toolbox
     command: sleep infinity
 
-  producer:
-    image: juplo/endless-long-producer:1.0-SNAPSHOT
+  gateway:
+    image: juplo/sumup-gateway:1.0-SNAPSHOT
     ports:
       - 8080:8080
     environment:
       server.port: 8080
-      producer.bootstrap-server: kafka:9092
-      producer.client-id: producer
-      producer.topic: test
-      producer.throttle-ms: 200
-
+      sumup.gateway.bootstrap-server: kafka:9092
+      sumup.gateway.client-id: gateway
+      sumup.gateway.topic: in
 
-  consumer-1:
-    image: juplo/endless-consumer:1.0-SNAPSHOT
+  requests-1:
+    image: juplo/sumup-requests:1.0-SNAPSHOT
     ports:
       - 8081:8080
     environment:
       server.port: 8080
-      consumer.bootstrap-server: kafka:9092
-      consumer.client-id: consumer-1
+      sumup.requests.bootstrap-server: kafka:9092
+      sumup.requests.client-id: requests-1
 
-  consumer-2:
-    image: juplo/endless-consumer:1.0-SNAPSHOT
+  requests-2:
+    image: juplo/sumup-requests:1.0-SNAPSHOT
     ports:
       - 8082:8080
     environment:
       server.port: 8080
-      consumer.bootstrap-server: kafka:9092
-      consumer.client-id: consumer-2
+      sumup.requests.bootstrap-server: kafka:9092
+      sumup.requests.client-id: requests-2
 
-  consumer-3:
-    image: juplo/endless-consumer:1.0-SNAPSHOT
+  adder-1:
+    image: juplo/sumup-adder:1.0-SNAPSHOT
     ports:
-      - 8083:8080
+      - 8091:8080
     environment:
       server.port: 8080
-      consumer.bootstrap-server: kafka:9092
-      consumer.client-id: consumer-3
+      sumup.adder.bootstrap-server: kafka:9092
+      sumup.adder.client-id: adder-1
+      sumup.adder.commit-interval: 3s
+      sumup.adder.throttle: 3ms
+      spring.data.mongodb.uri: mongodb://juplo:training@mongo:27017
+      spring.data.mongodb.database: juplo
+      logging.level.org.apache.kafka.clients.consumer: DEBUG
 
-  consumer-4:
-    image: juplo/endless-consumer:1.0-SNAPSHOT
+  adder-2:
+    image: juplo/sumup-adder:1.0-SNAPSHOT
     ports:
-      - 8084:8080
+      - 8092:8080
     environment:
       server.port: 8080
-      consumer.bootstrap-server: kafka:9092
-      consumer.client-id: consumer-4
+      sumup.adder.bootstrap-server: kafka:9092
+      sumup.adder.client-id: adder-2
+      sumup.adder.commit-interval: 3s
+      sumup.adder.throttle: 3ms
+      spring.data.mongodb.uri: mongodb://juplo:training@mongo:27017
+      spring.data.mongodb.database: juplo
+      logging.level.org.apache.kafka.clients.consumer: DEBUG
 
-  consumer-5:
-    image: juplo/endless-consumer:1.0-SNAPSHOT
-    ports:
-      - 8085:8080
-    environment:
-      server.port: 8080
-      consumer.bootstrap-server: kafka:9092
-      consumer.client-id: consumer-5
+  peter:
+    image: juplo/toolbox
+    command: >
+      bash -c "
+      while [[ true ]];
+      do
+        echo 666 | http -v gateway:8080/peter;
+        sleep 1;
+      done
+      "
+  klaus:
+    image: juplo/toolbox
+    command: >
+      bash -c "
+      while [[ true ]];
+      do
+        echo 666 | http -v gateway:8080/klaus;
+        sleep 1;
+      done
+      "
diff --git a/pom.xml b/pom.xml
index e664a07..6699408 100644 (file)
--- a/pom.xml
+++ b/pom.xml
   </parent>
 
   <groupId>de.juplo.kafka</groupId>
-  <artifactId>endless-consumer</artifactId>
+  <artifactId>sumup-adder</artifactId>
   <version>1.0-SNAPSHOT</version>
-  <name>Endless Consumer: a Simple Consumer-Group that reads and prints the topic and counts the received messages for each key by topic</name>
+  <name>SumUp Adder</name>
+  <description>Calculates the sum for the send messages</description>
 
   <properties>
     <java.version>11</java.version>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-web</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.springframework.boot</groupId>
+      <artifactId>spring-boot-starter-data-mongodb</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-validation</artifactId>
       <artifactId>awaitility</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>de.flapdoodle.embed</groupId>
+      <artifactId>de.flapdoodle.embed.mongo</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.assertj</groupId>
+      <artifactId>assertj-core</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>
diff --git a/src/main/java/de/juplo/kafka/AdderBusinessLogic.java b/src/main/java/de/juplo/kafka/AdderBusinessLogic.java
new file mode 100644 (file)
index 0000000..d525182
--- /dev/null
@@ -0,0 +1,55 @@
+package de.juplo.kafka;
+
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+
+public class AdderBusinessLogic
+{
+  private final Map<String, AdderResult> state;
+
+
+  public AdderBusinessLogic()
+  {
+    this(new HashMap<>());
+  }
+
+  public AdderBusinessLogic(Map<String, AdderResult> state)
+  {
+    this.state = state;
+  }
+
+
+  public synchronized Optional<Long> getSum(String user)
+  {
+    return Optional.ofNullable(state.get(user)).map(result -> result.sum);
+  }
+
+  public synchronized void addToSum(String user, Integer value)
+  {
+    if (value == null || value < 1)
+      throw new IllegalArgumentException("Not a positive number: " + value);
+
+    long sum =
+        Optional
+            .ofNullable(state.get(user))
+            .map(result -> result.sum)
+            .orElse(0l);
+    state.put(user, new AdderResult(value, sum + value));
+  }
+
+  public synchronized AdderResult calculate(String user)
+  {
+    if (!state.containsKey(user))
+      throw new IllegalStateException("No sumation for " + user + " in progress");
+
+    return state.remove(user);
+  }
+
+  protected Map<String, AdderResult> getState()
+  {
+    return state;
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/AdderResult.java b/src/main/java/de/juplo/kafka/AdderResult.java
new file mode 100644 (file)
index 0000000..44b7da8
--- /dev/null
@@ -0,0 +1,21 @@
+package de.juplo.kafka;
+
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
+
+@RequiredArgsConstructor
+@Getter
+@EqualsAndHashCode
+public class AdderResult
+{
+  final int number;
+  final long sum;
+
+  @Override
+  public String toString()
+  {
+    return "sum(" + number + ") = " + sum;
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/AdderResults.java b/src/main/java/de/juplo/kafka/AdderResults.java
new file mode 100644 (file)
index 0000000..e7f5602
--- /dev/null
@@ -0,0 +1,47 @@
+package de.juplo.kafka;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+
+public class AdderResults
+{
+  private final Map<Integer, Map<String, List<AdderResult>>> results = new HashMap<>();
+
+
+  public void addResults(Integer partition, String user, AdderResult result)
+  {
+    Map<String, List<AdderResult>> resultsByUser = this.results.get(partition);
+
+    List<AdderResult> results = resultsByUser.get(user);
+    if (results == null)
+    {
+      results = new LinkedList<>();
+      resultsByUser.put(user, results);
+    }
+
+    results.add(result);
+  }
+
+  protected void addPartition(Integer partition, Map<String, List<AdderResult>> results)
+  {
+    this.results.put(partition, results);
+  }
+
+  protected Map<String, List<AdderResult>> removePartition(Integer partition)
+  {
+    return this.results.remove(partition);
+  }
+
+  public Map<Integer, Map<String, List<AdderResult>>> getState()
+  {
+    return results;
+  }
+
+  public Map<String, List<AdderResult>> getState(Integer partition)
+  {
+    return results.get(partition);
+  }
+}
index b5bd1b9..76c2520 100644 (file)
@@ -8,7 +8,6 @@ import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 
 import javax.annotation.PreDestroy;
-import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
@@ -31,8 +30,22 @@ public class Application implements ApplicationRunner
   }
 
   @PreDestroy
-  public void stopExecutor()
+  public void shutdown()
   {
+    try
+    {
+      log.info("Stopping EndlessConsumer");
+      endlessConsumer.stop();
+    }
+    catch (IllegalStateException e)
+    {
+      log.info("Was already stopped: {}", e.toString());
+    }
+    catch (Exception e)
+    {
+      log.error("Unexpected exception while stopping EndlessConsumer: {}", e);
+    }
+
     try
     {
       log.info("Shutting down the ExecutorService.");
@@ -42,7 +55,7 @@ public class Application implements ApplicationRunner
     }
     catch (InterruptedException e)
     {
-      log.error("Exception while waiting for the termination of the ExecutorService: {}", e.toString());
+      log.error("Exception while waiting for the termination of the ExecutorService: {}", e);
     }
     finally
     {
index 653e81e..e08cff4 100644 (file)
@@ -1,13 +1,13 @@
 package de.juplo.kafka;
 
-import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.serialization.LongDeserializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
+import java.util.Optional;
 import java.util.Properties;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -18,19 +18,44 @@ import java.util.concurrent.Executors;
 public class ApplicationConfiguration
 {
   @Bean
-  public RecordHandler<String, Long> applicationRecordHandler()
+  public ApplicationRecordHandler recordHandler(
+      AdderResults adderResults,
+      ApplicationProperties properties)
+  {
+    return new ApplicationRecordHandler(
+        adderResults,
+        Optional.ofNullable(properties.getThrottle()),
+        properties.getClientId());
+  }
+
+  @Bean
+  public AdderResults adderResults()
+  {
+    return new AdderResults();
+  }
+
+  @Bean
+  public ApplicationRebalanceListener rebalanceListener(
+      ApplicationRecordHandler recordHandler,
+      AdderResults adderResults,
+      StateRepository stateRepository,
+      Consumer<String, String> consumer,
+      ApplicationProperties properties)
   {
-    return (record) ->
-    {
-      // Handle record
-    };
+    return new ApplicationRebalanceListener(
+        recordHandler,
+        adderResults,
+        stateRepository,
+        properties.getClientId(),
+        consumer);
   }
 
   @Bean
-  public EndlessConsumer<String, Long> endlessConsumer(
-      KafkaConsumer<String, Long> kafkaConsumer,
+  public EndlessConsumer<String, String> endlessConsumer(
+      KafkaConsumer<String, String> kafkaConsumer,
       ExecutorService executor,
-      RecordHandler recordHandler,
+      ApplicationRebalanceListener rebalanceListener,
+      ApplicationRecordHandler recordHandler,
       ApplicationProperties properties)
   {
     return
@@ -39,6 +64,7 @@ public class ApplicationConfiguration
             properties.getClientId(),
             properties.getTopic(),
             kafkaConsumer,
+            rebalanceListener,
             recordHandler);
   }
 
@@ -49,18 +75,19 @@ public class ApplicationConfiguration
   }
 
   @Bean(destroyMethod = "close")
-  public KafkaConsumer<String, Long> kafkaConsumer(ApplicationProperties properties)
+  public KafkaConsumer<String, String> kafkaConsumer(ApplicationProperties properties)
   {
     Properties props = new Properties();
 
     props.put("bootstrap.servers", properties.getBootstrapServer());
+    props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.CooperativeStickyAssignor");
     props.put("group.id", properties.getGroupId());
     props.put("client.id", properties.getClientId());
     props.put("auto.offset.reset", properties.getAutoOffsetReset());
     props.put("auto.commit.interval.ms", (int)properties.getCommitInterval().toMillis());
     props.put("metadata.max.age.ms", "1000");
     props.put("key.deserializer", StringDeserializer.class.getName());
-    props.put("value.deserializer", LongDeserializer.class.getName());
+    props.put("value.deserializer", StringDeserializer.class.getName());
 
     return new KafkaConsumer<>(props);
   }
index dc3a26e..df4e653 100644 (file)
@@ -10,7 +10,7 @@ import org.springframework.stereotype.Component;
 @RequiredArgsConstructor
 public class ApplicationHealthIndicator implements HealthIndicator
 {
-  private final EndlessConsumer<String, Long> consumer;
+  private final EndlessConsumer<String, String> consumer;
 
 
   @Override
index 14e928f..f852c00 100644 (file)
@@ -10,7 +10,7 @@ import javax.validation.constraints.NotNull;
 import java.time.Duration;
 
 
-@ConfigurationProperties(prefix = "consumer")
+@ConfigurationProperties(prefix = "sumup.adder")
 @Validated
 @Getter
 @Setter
@@ -33,4 +33,5 @@ public class ApplicationProperties
   private String autoOffsetReset;
   @NotNull
   private Duration commitInterval;
+  private Duration throttle;
 }
diff --git a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java
new file mode 100644 (file)
index 0000000..63d57df
--- /dev/null
@@ -0,0 +1,98 @@
+package de.juplo.kafka;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.*;
+
+
+@RequiredArgsConstructor
+@Slf4j
+public class ApplicationRebalanceListener implements RebalanceListener
+{
+  private final ApplicationRecordHandler recordHandler;
+  private final AdderResults adderResults;
+  private final StateRepository stateRepository;
+  private final String id;
+  private final Consumer consumer;
+
+  private final Set<Integer> partitions = new HashSet<>();
+
+  private boolean commitsEnabled = true;
+
+  @Override
+  public void onPartitionsAssigned(Collection<TopicPartition> partitions)
+  {
+    partitions.forEach(tp ->
+    {
+      Integer partition = tp.partition();
+      log.info("{} - adding partition: {}", id, partition);
+      this.partitions.add(partition);
+      StateDocument document =
+          stateRepository
+              .findById(Integer.toString(partition))
+              .orElse(new StateDocument(partition));
+      recordHandler.addPartition(partition, document.state);
+      for (String user : document.state.keySet())
+      {
+        log.info(
+            "{} - Restored state for partition={}|user={}: {}",
+            id,
+            partition,
+            user,
+            document.state.get(user));
+      }
+      adderResults.addPartition(partition, document.results);
+    });
+  }
+
+  @Override
+  public void onPartitionsRevoked(Collection<TopicPartition> partitions)
+  {
+    if (commitsEnabled)
+    {
+      log.info("{} - Commiting offsets for all previously assigned partitions", id);
+      try
+      {
+        consumer.commitSync();
+      }
+      catch (Exception e)
+      {
+        log.warn("{} - Could not commit offsets in onPartitionsRevoked():", id, e);
+      }
+    }
+
+    partitions.forEach(tp ->
+    {
+      Integer partition = tp.partition();
+      log.info("{} - removing partition: {}", id, partition);
+      this.partitions.remove(partition);
+      Map<String, AdderResult> state = recordHandler.removePartition(partition);
+      for (String user : state.keySet())
+      {
+        log.info(
+            "{} - Saved state for partition={}|user={}: {}",
+            id,
+            partition,
+            user,
+            state.get(user));
+      }
+      Map<String, List<AdderResult>> results = adderResults.removePartition(partition);
+      stateRepository.save(new StateDocument(partition, state, results));
+    });
+  }
+
+  @Override
+  public void enableCommits()
+  {
+    commitsEnabled = true;
+  }
+
+  @Override
+  public void disableCommits()
+  {
+    commitsEnabled = false;
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java
new file mode 100644 (file)
index 0000000..51d524f
--- /dev/null
@@ -0,0 +1,75 @@
+package de.juplo.kafka;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+
+@RequiredArgsConstructor
+@Slf4j
+public class ApplicationRecordHandler implements RecordHandler<String, String>
+{
+  private final AdderResults results;
+  private final Optional<Duration> throttle;
+  private final String id;
+
+  private final Map<Integer, AdderBusinessLogic> state = new HashMap<>();
+
+
+  @Override
+  public void accept(ConsumerRecord<String, String> record)
+  {
+    Integer partition = record.partition();
+    String user = record.key();
+    String message = record.value();
+
+    if (message.equals("CALCULATE"))
+    {
+      AdderResult result = state.get(partition).calculate(user);
+      log.info("{} - New result for {}: {}", id, user, result);
+      results.addResults(partition, user, result);
+    }
+    else
+    {
+      state.get(partition).addToSum(user, Integer.parseInt(message));
+    }
+
+    if (throttle.isPresent())
+    {
+      try
+      {
+        Thread.sleep(throttle.get().toMillis());
+      }
+      catch (InterruptedException e)
+      {
+        log.warn("{} - Intrerrupted while throttling: {}", id, e);
+      }
+    }
+  }
+
+  protected void addPartition(Integer partition, Map<String, AdderResult> state)
+  {
+    this.state.put(partition, new AdderBusinessLogic(state));
+  }
+
+  protected Map<String, AdderResult> removePartition(Integer partition)
+  {
+    return this.state.remove(partition).getState();
+  }
+
+
+  public Map<Integer, AdderBusinessLogic> getState()
+  {
+    return state;
+  }
+
+  public AdderBusinessLogic getState(Integer partition)
+  {
+    return state.get(partition);
+  }
+}
index ed38080..26a5bc8 100644 (file)
@@ -2,14 +2,14 @@ package de.juplo.kafka;
 
 import lombok.RequiredArgsConstructor;
 import org.springframework.http.HttpStatus;
-import org.springframework.web.bind.annotation.ExceptionHandler;
-import org.springframework.web.bind.annotation.GetMapping;
-import org.springframework.web.bind.annotation.PostMapping;
-import org.springframework.web.bind.annotation.ResponseStatus;
-import org.springframework.web.bind.annotation.RestController;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.*;
 
+import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
 
 
 @RestController
@@ -17,6 +17,8 @@ import java.util.concurrent.ExecutionException;
 public class DriverController
 {
   private final EndlessConsumer consumer;
+  private final ApplicationRecordHandler recordHandler;
+  private final AdderResults results;
 
 
   @PostMapping("start")
@@ -32,10 +34,49 @@ public class DriverController
   }
 
 
-  @GetMapping("seen")
-  public Map<Integer, Map<String, Long>> seen()
+  @GetMapping("state")
+  public Map<Integer, Map<String, AdderResult>> state()
   {
-    return consumer.getSeen();
+    return
+        recordHandler
+            .getState()
+            .entrySet()
+            .stream()
+            .collect(Collectors.toMap(
+                entry -> entry.getKey(),
+                entry -> entry.getValue().getState()));
+  }
+
+  @GetMapping("state/{user}")
+  public ResponseEntity<Long> state(@PathVariable String user)
+  {
+    for (AdderBusinessLogic adder : recordHandler.getState().values())
+    {
+      Optional<Long> sum = adder.getSum(user);
+      if (sum.isPresent())
+        return ResponseEntity.ok(sum.get());
+    }
+
+    return ResponseEntity.notFound().build();
+  }
+
+  @GetMapping("results")
+  public Map<Integer, Map<String, List<AdderResult>>> results()
+  {
+    return results.getState();
+  }
+
+  @GetMapping("results/{user}")
+  public ResponseEntity<List<AdderResult>> results(@PathVariable String user)
+  {
+    for (Map<String, List<AdderResult>> resultsByUser : this.results.getState().values())
+    {
+      List<AdderResult> results = resultsByUser.get(user);
+      if (results != null)
+        return ResponseEntity.ok(results);
+    }
+
+    return ResponseEntity.notFound().build();
   }
 
 
index 788a4a7..c3ed7c3 100644 (file)
@@ -3,7 +3,6 @@ package de.juplo.kafka;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.common.TopicPartition;
@@ -22,13 +21,14 @@ import java.util.concurrent.locks.ReentrantLock;
 
 @Slf4j
 @RequiredArgsConstructor
-public class EndlessConsumer<K, V> implements ConsumerRebalanceListener, Runnable
+public class EndlessConsumer<K, V> implements Runnable
 {
   private final ExecutorService executor;
   private final String id;
   private final String topic;
   private final Consumer<K, V> consumer;
-  private final RecordHandler handler;
+  private final RebalanceListener rebalanceListener;
+  private final RecordHandler<K, V> recordHandler;
 
   private final Lock lock = new ReentrantLock();
   private final Condition condition = lock.newCondition();
@@ -36,50 +36,6 @@ public class EndlessConsumer<K, V> implements ConsumerRebalanceListener, Runnabl
   private Exception exception;
   private long consumed = 0;
 
-  private final Map<Integer, Map<String, Long>> seen = new HashMap<>();
-  private final Map<Integer, Long> offsets = new HashMap<>();
-
-
-  @Override
-  public void onPartitionsRevoked(Collection<TopicPartition> partitions)
-  {
-    partitions.forEach(tp ->
-    {
-      Integer partition = tp.partition();
-      Long newOffset = consumer.position(tp);
-      Long oldOffset = offsets.remove(partition);
-      log.info(
-          "{} - removing partition: {}, consumed {} records (offset {} -> {})",
-          id,
-          partition,
-          newOffset - oldOffset,
-          oldOffset,
-          newOffset);
-      Map<String, Long> removed = seen.remove(partition);
-      for (String key : removed.keySet())
-      {
-        log.info(
-            "{} - Seen {} messages for partition={}|key={}",
-            id,
-            removed.get(key),
-            partition,
-            key);
-      }
-    });
-  }
-
-  @Override
-  public void onPartitionsAssigned(Collection<TopicPartition> partitions)
-  {
-    partitions.forEach(tp ->
-    {
-      Integer partition = tp.partition();
-      Long offset = consumer.position(tp);
-      log.info("{} - adding partition: {}, offset={}", id, partition, offset);
-      offsets.put(partition, offset);
-      seen.put(partition, new HashMap<>());
-    });
-  }
 
 
   @Override
@@ -88,7 +44,8 @@ public class EndlessConsumer<K, V> implements ConsumerRebalanceListener, Runnabl
     try
     {
       log.info("{} - Subscribing to topic {}", id, topic);
-      consumer.subscribe(Arrays.asList(topic), this);
+      rebalanceListener.enableCommits();
+      consumer.subscribe(Arrays.asList(topic), rebalanceListener);
 
       while (true)
       {
@@ -109,27 +66,15 @@ public class EndlessConsumer<K, V> implements ConsumerRebalanceListener, Runnabl
               record.value()
           );
 
-          handler.accept(record);
+          recordHandler.accept(record);
 
           consumed++;
-
-          Integer partition = record.partition();
-          String key = record.key() == null ? "NULL" : record.key().toString();
-          Map<String, Long> byKey = seen.get(partition);
-
-          if (!byKey.containsKey(key))
-            byKey.put(key, 0l);
-
-          long seenByKey = byKey.get(key);
-          seenByKey++;
-          byKey.put(key, seenByKey);
         }
       }
     }
     catch(WakeupException e)
     {
-      log.info("{} - RIIING! Request to stop consumption - commiting current offsets!", id);
-      consumer.commitSync();
+      log.info("{} - RIIING! Request to stop consumption.", id);
       shutdown();
     }
     catch(RecordDeserializationException e)
@@ -143,12 +88,12 @@ public class EndlessConsumer<K, V> implements ConsumerRebalanceListener, Runnabl
           offset,
           e.getCause().toString());
 
-      consumer.commitSync();
       shutdown(e);
     }
     catch(Exception e)
     {
-      log.error("{} - Unexpected error: {}", id, e.toString(), e);
+      log.error("{} - Unexpected error: {}, disabling commits", id, e.toString(), e);
+      rebalanceListener.disableCommits();
       shutdown(e);
     }
     finally
@@ -193,11 +138,6 @@ public class EndlessConsumer<K, V> implements ConsumerRebalanceListener, Runnabl
     }
   }
 
-  public Map<Integer, Map<String, Long>> getSeen()
-  {
-    return seen;
-  }
-
   public void start()
   {
     lock.lock();
@@ -217,7 +157,7 @@ public class EndlessConsumer<K, V> implements ConsumerRebalanceListener, Runnabl
     }
   }
 
-  public synchronized void stop() throws ExecutionException, InterruptedException
+  public synchronized void stop() throws InterruptedException
   {
     lock.lock();
     try
@@ -240,22 +180,7 @@ public class EndlessConsumer<K, V> implements ConsumerRebalanceListener, Runnabl
   public void destroy() throws ExecutionException, InterruptedException
   {
     log.info("{} - Destroy!", id);
-    try
-    {
-      stop();
-    }
-    catch (IllegalStateException e)
-    {
-      log.info("{} - Was already stopped", id);
-    }
-    catch (Exception e)
-    {
-      log.error("{} - Unexpected exception while trying to stop the consumer", id, e);
-    }
-    finally
-    {
-      log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
-    }
+    log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
   }
 
   public boolean running()
diff --git a/src/main/java/de/juplo/kafka/RebalanceListener.java b/src/main/java/de/juplo/kafka/RebalanceListener.java
new file mode 100644 (file)
index 0000000..26f97aa
--- /dev/null
@@ -0,0 +1,10 @@
+package de.juplo.kafka;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+
+
+public interface RebalanceListener extends ConsumerRebalanceListener
+{
+  void enableCommits();
+  void disableCommits();
+}
diff --git a/src/main/java/de/juplo/kafka/StateDocument.java b/src/main/java/de/juplo/kafka/StateDocument.java
new file mode 100644 (file)
index 0000000..ae8eb51
--- /dev/null
@@ -0,0 +1,41 @@
+package de.juplo.kafka;
+
+import lombok.ToString;
+import org.springframework.data.annotation.Id;
+import org.springframework.data.mongodb.core.mapping.Document;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+
+@Document(collection = "state")
+@ToString
+public class StateDocument
+{
+  @Id
+  public String id;
+  public Map<String, AdderResult> state;
+  public Map<String, List<AdderResult>> results;
+
+  public StateDocument()
+  {
+  }
+
+  public StateDocument(Integer partition)
+  {
+    this.id = Integer.toString(partition);
+    this.state = new HashMap<>();
+    this.results = new HashMap<>();
+  }
+
+  public StateDocument(
+      Integer partition,
+      Map<String, AdderResult> state,
+      Map<String, List<AdderResult>> results)
+  {
+    this.id = Integer.toString(partition);
+    this.state = state;
+    this.results = results;
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/StateRepository.java b/src/main/java/de/juplo/kafka/StateRepository.java
new file mode 100644 (file)
index 0000000..3129535
--- /dev/null
@@ -0,0 +1,11 @@
+package de.juplo.kafka;
+
+import org.springframework.data.mongodb.repository.MongoRepository;
+
+import java.util.Optional;
+
+
+public interface StateRepository extends MongoRepository<StateDocument, String>
+{
+  public Optional<StateDocument> findById(String partition);
+}
index f8bfe7e..26948f5 100644 (file)
@@ -1,10 +1,11 @@
-consumer:
-  bootstrap-server: :9092
-  group-id: my-group
-  client-id: DEV
-  topic: test
-  auto-offset-reset: earliest
-  commit-interval: 5s
+sumup:
+  adder:
+    bootstrap-server: :9092
+    group-id: my-group
+    client-id: DEV
+    topic: out
+    auto-offset-reset: earliest
+    commit-interval: 5s
 management:
   endpoint:
     shutdown:
@@ -25,6 +26,11 @@ info:
     group-id: ${consumer.group-id}
     topic: ${consumer.topic}
     auto-offset-reset: ${consumer.auto-offset-reset}
+spring:
+  data:
+    mongodb:
+      uri: mongodb://juplo:training@localhost:27017
+      database: juplo
 logging:
   level:
     root: INFO
diff --git a/src/test/java/de/juplo/kafka/AdderBusinessLogicTest.java b/src/test/java/de/juplo/kafka/AdderBusinessLogicTest.java
new file mode 100644 (file)
index 0000000..8e49263
--- /dev/null
@@ -0,0 +1,117 @@
+package de.juplo.kafka;
+
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.Arrays;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.*;
+
+
+public class AdderBusinessLogicTest
+{
+  @Test
+  @DisplayName("An empty Optional should be returned, for a non-existing sum")
+  public void testGetSumReturnsEmptyOptionalForNonExistingSum()
+  {
+    AdderBusinessLogic adder = new AdderBusinessLogic();
+    assertThat(adder.getSum("foo")).isEmpty();
+  }
+
+  @Test
+  @DisplayName("A non-empty Optional should be returned, for an existing sum")
+  public void testGetSumReturnsNonEmptyOptionalForExistingSum()
+  {
+    AdderBusinessLogic adder = new AdderBusinessLogic();
+    adder.addToSum("foo", 6);
+    assertThat(adder.getSum("foo")).isNotEmpty();
+  }
+
+  @Test
+  @DisplayName("A sum can be calculated, if it does exist")
+  public void testCalculatePossibleIfSumExists()
+  {
+    AdderBusinessLogic adder = new AdderBusinessLogic();
+    adder.addToSum("foo", 6);
+    assertThatNoException().isThrownBy(() -> adder.calculate("foo"));
+  }
+
+  @Test
+  @DisplayName("An existing sum is removed, if ended")
+  public void testCalculateRemovesSumIfSumExists()
+  {
+    AdderBusinessLogic adder = new AdderBusinessLogic();
+    adder.addToSum("foo", 6);
+    adder.calculate("foo");
+    assertThat(adder.getSum("foo")).isEmpty();
+  }
+
+  @Test
+  @DisplayName("An existing sum returns a non-null value, if calculated")
+  public void testCalculateReturnsNonNullValueIfSumExists()
+  {
+    AdderBusinessLogic adder = new AdderBusinessLogic();
+    adder.addToSum("foo", 6);
+    assertThat(adder.calculate("foo")).isNotNull();
+  }
+
+  @Test
+  @DisplayName("Ending a non-existing sum, causes an IllegalStateException")
+  public void testCalculateCausesExceptionIfNotExists()
+  {
+    AdderBusinessLogic adder = new AdderBusinessLogic();
+    assertThatIllegalStateException().isThrownBy(() -> adder.calculate("foo"));
+  }
+
+  @Test
+  @DisplayName("Adding a null-value to a sum causes an IllegalArgumentException")
+  public void testAddToSumWithNullValueCausesException()
+  {
+    AdderBusinessLogic adder = new AdderBusinessLogic();
+    assertThatIllegalArgumentException().isThrownBy(() -> adder.addToSum("foo", null));
+  }
+
+  @ParameterizedTest(name = "{index}: Adding {0}")
+  @DisplayName("Adding a non-positive value to a sum causes an IllegalArgumentException")
+  @ValueSource(ints = { 0, -1, -6, -66, Integer.MIN_VALUE })
+  public void testAddToSumWithNonPositiveValueCausesException(int value)
+  {
+    AdderBusinessLogic adder = new AdderBusinessLogic();
+    assertThatIllegalArgumentException().isThrownBy(() -> adder.addToSum("foo", value));
+  }
+
+  @ParameterizedTest(name = "{index}: Adding {0}")
+  @DisplayName("Can add a positive value to a sum")
+  @ValueSource(ints = { 1, 3, 6, 66, 7, 9 })
+  public void testAddToSumWithPositiveValuePossible(int value)
+  {
+    AdderBusinessLogic adder = new AdderBusinessLogic();
+    assertThatNoException().isThrownBy(() -> adder.addToSum("foo", value));
+  }
+
+  @ParameterizedTest(name = "{index}: Summing up {0}")
+  @DisplayName("Adds up numbers correctly")
+  @MethodSource("numbersProvider")
+  public void testAddToSumAddsUpNumbersCorrectlyIfSumExists(int... numbers)
+  {
+    long expectedResult = Arrays.stream(numbers).sum();
+    AdderBusinessLogic adder = new AdderBusinessLogic();
+    Arrays.stream(numbers).forEach(number -> adder.addToSum("foo", number));
+    AdderResult result = adder.calculate("foo");
+    assertThat(result.number).isEqualTo(numbers[numbers.length-1]);
+    assertThat(result.sum).isEqualTo(expectedResult);
+  }
+
+  static Stream<Arguments> numbersProvider() {
+    return Stream.of(
+        Arguments.of((Object) IntStream.rangeClosed(1,9).toArray()),
+        Arguments.of((Object) IntStream.rangeClosed(1,19).toArray()),
+        Arguments.of((Object) IntStream.rangeClosed(1,66).toArray()));
+  }
+}
index 67b9d75..3711a83 100644 (file)
@@ -15,7 +15,8 @@ import static de.juplo.kafka.ApplicationIT.TOPIC;
     webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
     properties = {
         "consumer.bootstrap-server=${spring.embedded.kafka.brokers}",
-        "consumer.topic=" + TOPIC })
+        "consumer.topic=" + TOPIC,
+        "spring.mongodb.embedded.version=4.4.13" })
 @EmbeddedKafka(topics = TOPIC)
 @AutoConfigureDataMongo
 public class ApplicationIT
index b7f8308..a150176 100644 (file)
 package de.juplo.kafka;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
-import org.springframework.boot.test.context.TestConfiguration;
-import org.springframework.context.annotation.Bean;
-import org.springframework.test.context.ContextConfiguration;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import java.util.*;
 
 import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.assertj.core.api.Assertions.assertThat;
 
 
-@ContextConfiguration(classes = ApplicationTests.Configuration.class)
-public class ApplicationTests extends GenericApplicationTests<String, Long>
+@Slf4j
+public class ApplicationTests extends GenericApplicationTests<String, String>
 {
+  @Autowired
+  StateRepository stateRepository;
+
+
   public ApplicationTests()
   {
-    super(
-        new RecordGenerator()
+    super(new ApplicationTestRecrodGenerator());
+    ((ApplicationTestRecrodGenerator) recordGenerator).tests = this;
+  }
+
+
+  static class ApplicationTestRecrodGenerator implements RecordGenerator
+  {
+    ApplicationTests tests;
+
+    final int[] numbers = {1, 77, 33, 2, 66, 666, 11};
+    final String[] dieWilden13 =
+      IntStream
+        .range(1, 14)
+        .mapToObj(i -> "seeräuber-" + i)
+        .toArray(i -> new String[i]);
+    final StringSerializer stringSerializer = new StringSerializer();
+    final Bytes calculateMessage = new Bytes(stringSerializer.serialize(TOPIC, "CALCULATE"));
+
+    int counter = 0;
+
+    Map<String, List<AdderResult>> state;
+
+    @Override
+    public int generate(
+      boolean poisonPills,
+      boolean logicErrors,
+      Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
+    {
+      counter = 0;
+      state =
+        Arrays
+          .stream(dieWilden13)
+          .collect(Collectors.toMap(
+            seeräuber -> seeräuber,
+            seeräuber -> new LinkedList()));
+
+      int number[] = {1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1};
+      int message[] = {1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1};
+      int next = 0;
+
+      for (int pass = 0; pass < 333; pass++)
+      {
+        for (int i = 0; i < 13; i++)
         {
-          final StringSerializer stringSerializer = new StringSerializer();
-          final LongSerializer longSerializer = new LongSerializer();
+          String seeräuber = dieWilden13[i];
+          Bytes key = new Bytes(stringSerializer.serialize(TOPIC, seeräuber));
+
+          if (message[i] > number[i])
+          {
+            send(key, calculateMessage, fail(logicErrors, pass, counter), messageSender);
+            state.get(seeräuber).add(new AdderResult(number[i], (number[i] + 1) * number[i] / 2));
+            // Pick next number to calculate
+            number[i] = numbers[next++ % numbers.length];
+            message[i] = 1;
+            log.debug("Seeräuber {} will die Summe für {} berechnen", seeräuber, number[i]);
+          }
+
+          Bytes value = new Bytes(stringSerializer.serialize(TOPIC, Integer.toString(message[i]++)));
+          send(key, value, fail(logicErrors, pass, counter), messageSender);
+        }
+      }
+
+      return counter;
+    }
 
+    boolean fail(boolean logicErrors, int pass, int counter)
+    {
+      return logicErrors && pass > 300 && counter % 77 == 0;
+    }
+
+    void send(
+      Bytes key,
+      Bytes value,
+      boolean fail,
+      Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
+    {
+      counter++;
+
+      if (fail)
+      {
+        value = new Bytes(stringSerializer.serialize(TOPIC, Integer.toString(-1)));
+      }
+
+      messageSender.accept(new ProducerRecord<>(TOPIC, key, value));
+    }
+
+    @Override
+    public boolean canGeneratePoisonPill()
+    {
+      return false;
+    }
 
-          @Override
-          public int generate(
-              boolean poisonPills,
-              boolean logicErrors,
-              Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
+    @Override
+    public void assertBusinessLogic()
+    {
+      for (int i = 0; i < PARTITIONS; i++)
+      {
+        StateDocument stateDocument =
+          tests.stateRepository.findById(Integer.toString(i)).get();
+
+        stateDocument
+          .results
+          .entrySet()
+          .stream()
+          .forEach(entry ->
           {
-            int i = 0;
+            String user = entry.getKey();
+            List<AdderResult> resultsForUser = entry.getValue();
 
-            for (int partition = 0; partition < 10; partition++)
+            for (int j = 0; j < resultsForUser.size(); j++)
             {
-              for (int key = 0; key < 10000; key++)
+              if (!(j < state.get(user).size()))
               {
-                i++;
-
-                Bytes value = new Bytes(longSerializer.serialize(TOPIC, (long)i));
-                if (i == 99977)
-                {
-                  if (logicErrors)
-                  {
-                    value = new Bytes(longSerializer.serialize(TOPIC, Long.MIN_VALUE));
-                  }
-                  if (poisonPills)
-                  {
-                    value = new Bytes(stringSerializer.serialize(TOPIC, "BOOM (Poison-Pill)!"));
-                  }
-                }
-
-                ProducerRecord<Bytes, Bytes> record =
-                    new ProducerRecord<>(
-                        TOPIC,
-                        partition,
-                        new Bytes(stringSerializer.serialize(TOPIC,Integer.toString(partition*10+key%2))),
-                        value);
-
-                messageSender.accept(record);
+                break;
               }
-            }
-
-            return i;
-          }
-        });
-  }
 
+              assertThat(resultsForUser.get(j))
+                .as("Unexpected results calculation %d of user %s", j, user)
+                .isEqualTo(state.get(user).get(j));
+            }
 
-  @TestConfiguration
-  public static class Configuration
-  {
-    @Bean
-    public RecordHandler<String, Long> applicationRecordHandler()
-    {
-      return (record) ->
-      {
-        if (record.value() == Long.MIN_VALUE)
-          throw new RuntimeException("BOOM (Logic-Error)!");
-      };
+            assertThat(state.get(user))
+              .as("More results calculated for user %s as expected", user)
+              .containsAll(resultsForUser);
+          });
+      }
     }
   }
-}
+}
\ No newline at end of file
index 4883f75..e16aea7 100644 (file)
@@ -1,5 +1,6 @@
 package de.juplo.kafka;
 
+import com.mongodb.client.MongoClient;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -11,6 +12,9 @@ import org.apache.kafka.common.serialization.*;
 import org.apache.kafka.common.utils.Bytes;
 import org.junit.jupiter.api.*;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
+import org.springframework.boot.autoconfigure.mongo.MongoProperties;
+import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo;
 import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer;
 import org.springframework.boot.test.context.TestConfiguration;
 import org.springframework.context.annotation.Bean;
@@ -35,10 +39,13 @@ import static org.awaitility.Awaitility.*;
 @SpringJUnitConfig(initializers = ConfigDataApplicationContextInitializer.class)
 @TestPropertySource(
                properties = {
-                               "consumer.bootstrap-server=${spring.embedded.kafka.brokers}",
-                               "consumer.topic=" + TOPIC,
-                               "consumer.commit-interval=500ms" })
+                               "sumup.adder.bootstrap-server=${spring.embedded.kafka.brokers}",
+                               "sumup.adder.topic=" + TOPIC,
+                               "sumup.adder.commit-interval=500ms",
+                               "spring.mongodb.embedded.version=4.4.13" })
 @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)
+@EnableAutoConfiguration
+@AutoConfigureDataMongo
 @Slf4j
 abstract class GenericApplicationTests<K, V>
 {
@@ -53,11 +60,16 @@ abstract class GenericApplicationTests<K, V>
        @Autowired
        ApplicationProperties applicationProperties;
        @Autowired
+       MongoClient mongoClient;
+       @Autowired
+       MongoProperties mongoProperties;
+       @Autowired
+       RebalanceListener rebalanceListener;
+       @Autowired
        TestRecordHandler<K, V> recordHandler;
        @Autowired
        EndlessConsumer<K, V> endlessConsumer;
 
-
        KafkaProducer<Bytes, Bytes> testRecordProducer;
        KafkaConsumer<Bytes, Bytes> offsetConsumer;
        Map<TopicPartition, Long> oldOffsets;
@@ -328,6 +340,7 @@ abstract class GenericApplicationTests<K, V>
                props.put("value.deserializer", BytesDeserializer.class.getName());
                offsetConsumer = new KafkaConsumer<>(props);
 
+               mongoClient.getDatabase(mongoProperties.getDatabase()).drop();
                seekToEnd();
 
                oldOffsets = new HashMap<>();