Verbesserte Tests und Korrekturen gemerged: sumup-adder -> stored-offsets stored-offsets
authorKai Moritz <kai@juplo.de>
Sun, 14 Aug 2022 18:52:49 +0000 (20:52 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 14 Aug 2022 18:56:09 +0000 (20:56 +0200)
15 files changed:
README.sh
docker-compose.yml
pom.xml
src/main/java/de/juplo/kafka/AdderBusinessLogic.java [deleted file]
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java
src/main/java/de/juplo/kafka/ApplicationProperties.java
src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java
src/main/java/de/juplo/kafka/ApplicationRecordHandler.java
src/main/java/de/juplo/kafka/DriverController.java
src/main/java/de/juplo/kafka/StateDocument.java
src/main/resources/application.yml
src/test/java/de/juplo/kafka/AdderBusinessLogicTest.java [deleted file]
src/test/java/de/juplo/kafka/ApplicationTests.java
src/test/java/de/juplo/kafka/GenericApplicationTests.java

index 2845ab1..133af42 100755 (executable)
--- a/README.sh
+++ b/README.sh
@@ -1,6 +1,6 @@
 #!/bin/bash
 
-IMAGE=juplo/sumup-adder:1.0-SNAPSHOT
+IMAGE=juplo/endless-consumer:1.0-SNAPSHOT
 
 if [ "$1" = "cleanup" ]
 then
@@ -9,14 +9,14 @@ then
   exit
 fi
 
-docker-compose up -d zookeeper kafka-1 kafka-2 kafka-3 cli mongo express
+docker-compose up -d zookeeper kafka cli mongo express
 
 if [[
   $(docker image ls -q $IMAGE) == "" ||
   "$1" = "build"
 ]]
 then
-  docker-compose rm -svf adder
+  docker-compose rm -svf peter beate
   mvn clean install || exit
 else
   echo "Using image existing images:"
@@ -26,19 +26,16 @@ fi
 echo "Waiting for the Kafka-Cluster to become ready..."
 docker-compose exec cli cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1
 docker-compose up setup
-docker-compose up -d gateway requests adder
+docker-compose up -d producer peter beate
 
-while ! [[ $(http 0:8080/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for gateway..."; sleep 1; done
-while ! [[ $(http 0:8081/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for requests..."; sleep 1; done
-while ! [[ $(http 0:8082/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for adder..."; sleep 1; done
+sleep 15
 
-echo 66  | http -v :8080/foo
-echo 666 | http -v :8080/bar
+http -v post :8082/stop
+sleep 10
+docker-compose kill -s 9 peter
+http -v post :8082/start
+sleep 60
 
-sleep 5
-
-http -v :8082/state
-http -v :8082/state/foo
-http -v :8082/state/bar
-
-docker-compose logs adder
+docker-compose stop producer peter beate
+docker-compose logs beate
+docker-compose logs --tail=10 peter
index fec5bca..7ab77b2 100644 (file)
@@ -7,56 +7,20 @@ services:
     ports:
       - 2181:2181
 
-  kafka-1:
+  kafka:
     image: confluentinc/cp-kafka:7.1.3
     environment:
       KAFKA_BROKER_ID: 1
       KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
-      KAFKA_LISTENERS: DOCKER://:9092, LOCALHOST://:9081
-      KAFKA_ADVERTISED_LISTENERS: DOCKER://kafka-1:9092, LOCALHOST://localhost:9081
-      KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER
-      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER:PLAINTEXT, LOCALHOST:PLAINTEXT
-      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
-      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
-    ports:
-      - 9081:9081
-    depends_on:
-      - zookeeper
-
-  kafka-2:
-    image: confluentinc/cp-kafka:7.1.3
-    environment:
-      KAFKA_BROKER_ID: 2
-      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
       KAFKA_LISTENERS: DOCKER://:9092, LOCALHOST://:9082
-      KAFKA_ADVERTISED_LISTENERS: DOCKER://kafka-2:9092, LOCALHOST://localhost:9082
+      KAFKA_ADVERTISED_LISTENERS: DOCKER://kafka:9092, LOCALHOST://localhost:9082
       KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER
       KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER:PLAINTEXT, LOCALHOST:PLAINTEXT
-      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
+      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
       KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
     ports:
       - 9092:9082
       - 9082:9082
-    networks:
-      default:
-        aliases:
-          - kafka
-    depends_on:
-      - zookeeper
-
-  kafka-3:
-    image: confluentinc/cp-kafka:7.1.3
-    environment:
-      KAFKA_BROKER_ID: 3
-      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
-      KAFKA_LISTENERS: DOCKER://:9092, LOCALHOST://:9083
-      KAFKA_ADVERTISED_LISTENERS: DOCKER://kafka-3:9092, LOCALHOST://localhost:9083
-      KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER
-      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER:PLAINTEXT, LOCALHOST:PLAINTEXT
-      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
-      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
-    ports:
-      - 9083:9083
     depends_on:
       - zookeeper
 
@@ -83,44 +47,46 @@ services:
     image: juplo/toolbox
     command: >
       bash -c "
-        kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic in
-        kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic out
-        kafka-topics --bootstrap-server kafka:9092 --create --topic in --partitions 2 --replication-factor 3 --config min.insync.replicas=2
-        kafka-topics --bootstrap-server kafka:9092 --create --topic out --partitions 1 --replication-factor 1 
-        kafka-topics --bootstrap-server kafka:9092 --describe --topic in
-        kafka-topics --bootstrap-server kafka:9092 --describe --topic out
+        kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic test
+        kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 2
       "
 
   cli:
     image: juplo/toolbox
     command: sleep infinity
 
-  gateway:
-    image: juplo/sumup-gateway:1.0-SNAPSHOT
+  producer:
+    image: juplo/endless-long-producer:1.0-SNAPSHOT
     ports:
       - 8080:8080
     environment:
       server.port: 8080
-      sumup.gateway.bootstrap-server: kafka:9092
-      sumup.gateway.client-id: gateway
-      sumup.gateway.topic: in
+      producer.bootstrap-server: kafka:9092
+      producer.client-id: producer
+      producer.topic: test
+      producer.throttle-ms: 500
+
 
-  requests:
-    image: juplo/sumup-requests:1.0-SNAPSHOT
+  peter:
+    image: juplo/endless-consumer:1.0-SNAPSHOT
     ports:
       - 8081:8080
     environment:
       server.port: 8080
-      sumup.requests.bootstrap-server: kafka:9092
-      sumup.requests.client-id: requests
+      consumer.bootstrap-server: kafka:9092
+      consumer.client-id: peter
+      consumer.topic: test
+      spring.data.mongodb.uri: mongodb://juplo:training@mongo:27017
+      spring.data.mongodb.database: juplo
 
-  adder:
-    image: juplo/sumup-adder:1.0-SNAPSHOT
+  beate:
+    image: juplo/endless-consumer:1.0-SNAPSHOT
     ports:
       - 8082:8080
     environment:
       server.port: 8080
-      sumup.adder.bootstrap-server: kafka:9092
-      sumup.adder.client-id: adder
+      consumer.bootstrap-server: kafka:9092
+      consumer.client-id: beate
+      consumer.topic: test
       spring.data.mongodb.uri: mongodb://juplo:training@mongo:27017
       spring.data.mongodb.database: juplo
diff --git a/pom.xml b/pom.xml
index ecb559a..fa78c70 100644 (file)
--- a/pom.xml
+++ b/pom.xml
   </parent>
 
   <groupId>de.juplo.kafka</groupId>
-  <artifactId>sumup-adder</artifactId>
+  <artifactId>endless-consumer</artifactId>
   <version>1.0-SNAPSHOT</version>
-  <name>SumUp Adder</name>
-  <description>Calculates the sum for the send messages</description>
+  <name>Endless Consumer: a Simple Consumer-Group that reads and prints the topic and counts the received messages for each key by topic</name>
 
   <properties>
     <java.version>11</java.version>
       <artifactId>de.flapdoodle.embed.mongo</artifactId>
       <scope>test</scope>
     </dependency>
-    <dependency>
-      <groupId>org.assertj</groupId>
-      <artifactId>assertj-core</artifactId>
-      <scope>test</scope>
-    </dependency>
   </dependencies>
 
   <build>
diff --git a/src/main/java/de/juplo/kafka/AdderBusinessLogic.java b/src/main/java/de/juplo/kafka/AdderBusinessLogic.java
deleted file mode 100644 (file)
index 1f3d9aa..0000000
+++ /dev/null
@@ -1,61 +0,0 @@
-package de.juplo.kafka;
-
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-
-
-public class AdderBusinessLogic
-{
-  private final Map<String, Long> state;
-
-
-  public AdderBusinessLogic()
-  {
-    this(new HashMap<>());
-  }
-
-  public AdderBusinessLogic(Map<String, Long> state)
-  {
-    this.state = state;
-  }
-
-
-  public synchronized void startSum(String user)
-  {
-    if (state.containsKey(user))
-      throw new IllegalStateException("Sumation for " + user + " already in progress, state: " + state.get(user));
-
-    state.put(user, 0l);
-  }
-
-  public synchronized Optional<Long> getSum(String user)
-  {
-    return Optional.ofNullable(state.get(user));
-  }
-
-  public synchronized void addToSum(String user, Integer value)
-  {
-    if (!state.containsKey(user))
-      throw new IllegalStateException("No sumation for " + user + " in progress");
-    if (value == null || value < 1)
-      throw new IllegalArgumentException("Not a positive number: " + value);
-
-    long result = state.get(user) + value;
-    state.put(user, result);
-  }
-
-  public synchronized Long endSum(String user)
-  {
-    if (!state.containsKey(user))
-      throw new IllegalStateException("No sumation for " + user + " in progress");
-
-    return state.remove(user);
-  }
-
-  protected Map<String, Long> getState()
-  {
-    return state;
-  }
-}
index 9f54083..a9d9b15 100644 (file)
@@ -2,6 +2,7 @@ package de.juplo.kafka;
 
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.LongDeserializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.context.annotation.Bean;
@@ -27,7 +28,7 @@ public class ApplicationConfiguration
   public ApplicationRebalanceListener rebalanceListener(
       ApplicationRecordHandler recordHandler,
       StateRepository stateRepository,
-      Consumer<String, String> consumer,
+      Consumer<String, Long> consumer,
       ApplicationProperties properties)
   {
     return new ApplicationRebalanceListener(
@@ -41,8 +42,8 @@ public class ApplicationConfiguration
   }
 
   @Bean
-  public EndlessConsumer<String, String> endlessConsumer(
-      KafkaConsumer<String, String> kafkaConsumer,
+  public EndlessConsumer<String, Long> endlessConsumer(
+      KafkaConsumer<String, Long> kafkaConsumer,
       ExecutorService executor,
       ApplicationRebalanceListener rebalanceListener,
       ApplicationRecordHandler recordHandler,
@@ -65,7 +66,7 @@ public class ApplicationConfiguration
   }
 
   @Bean(destroyMethod = "close")
-  public KafkaConsumer<String, String> kafkaConsumer(ApplicationProperties properties)
+  public KafkaConsumer<String, Long> kafkaConsumer(ApplicationProperties properties)
   {
     Properties props = new Properties();
 
@@ -75,9 +76,10 @@ public class ApplicationConfiguration
     props.put("client.id", properties.getClientId());
     props.put("enable.auto.commit", false);
     props.put("auto.offset.reset", properties.getAutoOffsetReset());
+    props.put("auto.commit.interval.ms", (int)properties.getCommitInterval().toMillis());
     props.put("metadata.max.age.ms", "1000");
     props.put("key.deserializer", StringDeserializer.class.getName());
-    props.put("value.deserializer", StringDeserializer.class.getName());
+    props.put("value.deserializer", LongDeserializer.class.getName());
 
     return new KafkaConsumer<>(props);
   }
index df4e653..dc3a26e 100644 (file)
@@ -10,7 +10,7 @@ import org.springframework.stereotype.Component;
 @RequiredArgsConstructor
 public class ApplicationHealthIndicator implements HealthIndicator
 {
-  private final EndlessConsumer<String, String> consumer;
+  private final EndlessConsumer<String, Long> consumer;
 
 
   @Override
index 410c623..14e928f 100644 (file)
@@ -10,7 +10,7 @@ import javax.validation.constraints.NotNull;
 import java.time.Duration;
 
 
-@ConfigurationProperties(prefix = "sumup.adder")
+@ConfigurationProperties(prefix = "consumer")
 @Validated
 @Getter
 @Setter
index 542af2d..444b7b7 100644 (file)
@@ -22,7 +22,7 @@ public class ApplicationRebalanceListener implements PollIntervalAwareConsumerRe
   private final String topic;
   private final Clock clock;
   private final Duration commitInterval;
-  private final Consumer<String, String> consumer;
+  private final Consumer<String, Long> consumer;
 
   private Instant lastCommit = Instant.EPOCH;
   private boolean commitsEnabled = true;
@@ -85,10 +85,10 @@ public class ApplicationRebalanceListener implements PollIntervalAwareConsumerRe
     if (lastCommit.plus(commitInterval).isBefore(clock.instant()))
     {
       log.debug("Storing data and offsets, last commit: {}", lastCommit);
-      recordHandler.getState().forEach((partiton, adder) -> stateRepository.save(
+      recordHandler.getState().forEach((partiton, state) -> stateRepository.save(
           new StateDocument(
               partiton,
-              adder.getState(),
+              state,
               consumer.position(new TopicPartition(topic, partiton)))));
       lastCommit = clock.instant();
     }
index d0d385c..c2c2657 100644 (file)
@@ -8,46 +8,38 @@ import java.util.Map;
 
 
 @Slf4j
-public class ApplicationRecordHandler implements RecordHandler<String, String>
+public class ApplicationRecordHandler implements RecordHandler<String, Long>
 {
-  private final Map<Integer, AdderBusinessLogic> state = new HashMap<>();
+  private final Map<Integer, Map<String, Long>> state = new HashMap<>();
 
 
   @Override
-  public void accept(ConsumerRecord<String, String> record)
+  public void accept(ConsumerRecord<String, Long> record)
   {
     Integer partition = record.partition();
-    String user = record.key();
-    String message = record.value();
-    switch (message)
-    {
-      case "START":
-        state.get(partition).startSum(user);
-        break;
-
-      case "END":
-        Long result = state.get(partition).endSum(user);
-        log.info("New result for {}: {}", user, result);
-        break;
-
-      default:
-        state.get(partition).addToSum(user, Integer.parseInt(message));
-        break;
-    }
+    String key = record.key() == null ? "NULL" : record.key().toString();
+    Map<String, Long> byKey = state.get(partition);
+
+    if (!byKey.containsKey(key))
+      byKey.put(key, 0l);
+
+    long seenByKey = byKey.get(key);
+    seenByKey++;
+    byKey.put(key, seenByKey);
   }
 
   protected void addPartition(Integer partition, Map<String, Long> state)
   {
-    this.state.put(partition, new AdderBusinessLogic(state));
+    this.state.put(partition, state);
   }
 
   protected Map<String, Long> removePartition(Integer partition)
   {
-    return this.state.remove(partition).getState();
+    return this.state.remove(partition);
   }
 
 
-  public Map<Integer, AdderBusinessLogic> getState()
+  public Map<Integer, Map<String, Long>> getState()
   {
     return state;
   }
index d389271..09fb762 100644 (file)
@@ -2,13 +2,10 @@ package de.juplo.kafka;
 
 import lombok.RequiredArgsConstructor;
 import org.springframework.http.HttpStatus;
-import org.springframework.http.ResponseEntity;
 import org.springframework.web.bind.annotation.*;
 
 import java.util.Map;
-import java.util.Optional;
 import java.util.concurrent.ExecutionException;
-import java.util.stream.Collectors;
 
 
 @RestController
@@ -35,27 +32,7 @@ public class DriverController
   @GetMapping("state")
   public Map<Integer, Map<String, Long>> state()
   {
-    return
-        recordHandler
-            .getState()
-            .entrySet()
-            .stream()
-            .collect(Collectors.toMap(
-                entry -> entry.getKey(),
-                entry -> entry.getValue().getState()));
-  }
-
-  @GetMapping("state/{user}")
-  public ResponseEntity<Long> seen(@PathVariable String user)
-  {
-    for (AdderBusinessLogic adder : recordHandler.getState().values())
-    {
-      Optional<Long> sum = adder.getSum(user);
-      if (sum.isPresent())
-        return ResponseEntity.ok(sum.get());
-    }
-
-    return ResponseEntity.notFound().build();
+    return recordHandler.getState();
   }
 
 
index 0540e3f..bb1c701 100644 (file)
@@ -27,10 +27,7 @@ public class StateDocument
     this.state = new HashMap<>();
   }
 
-  public StateDocument(
-      Integer partition,
-      Map<String, Long> state,
-      long offset)
+  public StateDocument(Integer partition, Map<String, Long> state, long offset)
   {
     this.id = Integer.toString(partition);
     this.state = state;
index 26948f5..fc1c68a 100644 (file)
@@ -1,11 +1,10 @@
-sumup:
-  adder:
-    bootstrap-server: :9092
-    group-id: my-group
-    client-id: DEV
-    topic: out
-    auto-offset-reset: earliest
-    commit-interval: 5s
+consumer:
+  bootstrap-server: :9092
+  group-id: my-group
+  client-id: DEV
+  topic: test
+  auto-offset-reset: earliest
+  commit-interval: 5s
 management:
   endpoint:
     shutdown:
diff --git a/src/test/java/de/juplo/kafka/AdderBusinessLogicTest.java b/src/test/java/de/juplo/kafka/AdderBusinessLogicTest.java
deleted file mode 100644 (file)
index 435f036..0000000
+++ /dev/null
@@ -1,152 +0,0 @@
-package de.juplo.kafka;
-
-import org.junit.jupiter.api.DisplayName;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.Arguments;
-import org.junit.jupiter.params.provider.MethodSource;
-import org.junit.jupiter.params.provider.ValueSource;
-
-import java.util.Arrays;
-import java.util.stream.IntStream;
-import java.util.stream.Stream;
-
-import static org.assertj.core.api.Assertions.*;
-
-
-public class AdderBusinessLogicTest
-{
-  @Test
-  @DisplayName("A new sum can be started, if it does not exist")
-  public void testStartSumPossibleIfNotExists()
-  {
-    AdderBusinessLogic adder = new AdderBusinessLogic();
-    assertThatNoException().isThrownBy(() -> adder.startSum("foo"));
-  }
-
-  @Test
-  @DisplayName("Starting an already existing sum again, causes an IllegalStateException")
-  public void testStartSumCausesExceptionIfExists()
-  {
-    AdderBusinessLogic adder = new AdderBusinessLogic();
-    adder.startSum("foo");
-    assertThatIllegalStateException().isThrownBy(() -> adder.startSum("foo"));
-  }
-
-  @Test
-  @DisplayName("An empty Optional should be returned, for a non-existing sum")
-  public void testGetSumReturnsEmptyOptionalForNonExistingSum()
-  {
-    AdderBusinessLogic adder = new AdderBusinessLogic();
-    assertThat(adder.getSum("foo")).isEmpty();
-  }
-
-  @Test
-  @DisplayName("A non-empty Optional should be returned, for an existing sum")
-  public void testGetSumReturnsNonEmptyOptionalForExistingSum()
-  {
-    AdderBusinessLogic adder = new AdderBusinessLogic();
-    adder.startSum("foo");
-    assertThat(adder.getSum("foo")).isNotEmpty();
-  }
-
-  @Test
-  @DisplayName("A sum can be ended, if it does exist")
-  public void testEndSumPossibleIfSumExists()
-  {
-    AdderBusinessLogic adder = new AdderBusinessLogic();
-    adder.startSum("foo");
-    assertThatNoException().isThrownBy(() -> adder.endSum("foo"));
-  }
-
-  @Test
-  @DisplayName("An existing sum is removed, if ended")
-  public void testEndSumRemovesSumIfSumExists()
-  {
-    AdderBusinessLogic adder = new AdderBusinessLogic();
-    adder.startSum("foo");
-    adder.endSum("foo");
-    assertThat(adder.getSum("foo")).isEmpty();
-  }
-
-  @Test
-  @DisplayName("An existing Sum returns a non-null value, if ended")
-  public void testEndSumReturnsNonNullValueIfSumExists()
-  {
-    AdderBusinessLogic adder = new AdderBusinessLogic();
-    adder.startSum("foo");
-    assertThat(adder.endSum("foo")).isNotNull();
-  }
-
-  @Test
-  @DisplayName("An existing Sum returns a non-negative value, if ended")
-  public void testEndSumReturnsNonNegativeValueIfSumExists()
-  {
-    AdderBusinessLogic adder = new AdderBusinessLogic();
-    adder.startSum("foo");
-    assertThat(adder.endSum("foo")).isNotNegative();
-  }
-
-  @Test
-  @DisplayName("Ending a non-existing sum, causes an IllegalStateException")
-  public void testEndSumCausesExceptionIfNotExists()
-  {
-    AdderBusinessLogic adder = new AdderBusinessLogic();
-    assertThatIllegalStateException().isThrownBy(() -> adder.endSum("foo"));
-  }
-
-  @Test
-  @DisplayName("Adding to a non-existent sum causes an IllegalStateException")
-  public void testAddToSumCausesExceptionIfNotExists()
-  {
-    AdderBusinessLogic adder = new AdderBusinessLogic();
-    assertThatIllegalStateException().isThrownBy(() -> adder.addToSum("foo", 1));
-  }
-
-  @Test
-  @DisplayName("Adding a null-value to an existing sum causes an IllegalArgumentException")
-  public void testAddSumWithNullValueToExistingSumCausesException()
-  {
-    AdderBusinessLogic adder = new AdderBusinessLogic();
-    adder.startSum("foo");
-    assertThatIllegalArgumentException().isThrownBy(() -> adder.addToSum("foo", null));
-  }
-
-  @ParameterizedTest(name = "{index}: Adding {0}")
-  @DisplayName("Adding a non-positive value to an existing sum causes an IllegalArgumentException")
-  @ValueSource(ints = { 0, -1, -6, -66, Integer.MIN_VALUE })
-  public void testAddSumWithNonPositiveValueToExistingSumCausesException(int value)
-  {
-    AdderBusinessLogic adder = new AdderBusinessLogic();
-    adder.startSum("foo");
-    assertThatIllegalArgumentException().isThrownBy(() -> adder.addToSum("foo", value));
-  }
-
-  @Test
-  @DisplayName("Can add a positive value to an existing sum")
-  public void testAddSumWithPositiveValuePossibleIfSumExists()
-  {
-    AdderBusinessLogic adder = new AdderBusinessLogic();
-    adder.startSum("foo");
-    assertThatIllegalArgumentException().isThrownBy(() -> adder.addToSum("foo", -1));
-  }
-
-  @ParameterizedTest(name = "{index}: Summing up {0}")
-  @DisplayName("Adds up numbers correctly")
-  @MethodSource("numbersProvider")
-  public void testAddSumAddsUpNumbersCorrectlyIfSumExists(int... numbers)
-  {
-    long expectedResult = Arrays.stream(numbers).sum();
-    AdderBusinessLogic adder = new AdderBusinessLogic();
-    adder.startSum("foo");
-    Arrays.stream(numbers).forEach(number -> adder.addToSum("foo", number));
-    assertThat(adder.endSum("foo")).isEqualTo(expectedResult);
-  }
-
-  static Stream<Arguments> numbersProvider() {
-    return Stream.of(
-        Arguments.of((Object) IntStream.rangeClosed(1,9).toArray()),
-        Arguments.of((Object) IntStream.rangeClosed(1,19).toArray()),
-        Arguments.of((Object) IntStream.rangeClosed(1,66).toArray()));
-  }
-}
index 4ddf8a9..5166227 100644 (file)
@@ -1,31 +1,28 @@
 package de.juplo.kafka;
 
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Primary;
+import org.springframework.test.context.ContextConfiguration;
 
 import java.util.function.Consumer;
-import java.util.stream.IntStream;
 
 
-public class ApplicationTests extends GenericApplicationTests<String, String>
+@ContextConfiguration(classes = ApplicationTests.Configuration.class)
+public class ApplicationTests extends GenericApplicationTests<String, Long>
 {
   public ApplicationTests()
   {
     super(
         new RecordGenerator()
         {
-          final int[] numbers = { 1, 7, 3, 2, 33, 6, 11 };
-          final String[] dieWilden13 =
-              IntStream
-                  .range(1,14)
-                  .mapToObj(i -> "seeräuber-" + i)
-                  .toArray(i -> new String[i]);
           final StringSerializer stringSerializer = new StringSerializer();
-          final Bytes startMessage = new Bytes(stringSerializer.serialize(TOPIC, "START"));
-          final Bytes endMessage = new Bytes(stringSerializer.serialize(TOPIC, "END"));
-
-          int counter = 0;
+          final LongSerializer longSerializer = new LongSerializer();
 
 
           @Override
@@ -34,51 +31,62 @@ public class ApplicationTests extends GenericApplicationTests<String, String>
               boolean logicErrors,
               Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
           {
-            counter = 0;
+            int i = 0;
 
-            for (int i = 0; i < 33; i++)
+            for (int partition = 0; partition < 10; partition++)
             {
-              String seeräuber = dieWilden13[i%13];
-              int number = numbers[i%7];
-
-              Bytes key = new Bytes(stringSerializer.serialize(TOPIC, seeräuber));
-
-              send(key, startMessage, logicErrors, messageSender);
-              for (int message = 1; message <= number; message++)
+              for (int key = 0; key < 10; key++)
               {
-                Bytes value = new Bytes(stringSerializer.serialize(TOPIC, Integer.toString(message)));
-                send(key, value, logicErrors, messageSender);
-              }
-              send(key, endMessage, logicErrors, messageSender);
-            }
+                i++;
 
-            return counter;
-          }
+                Bytes value = new Bytes(longSerializer.serialize(TOPIC, (long)i));
+                if (i == 77)
+                {
+                  if (logicErrors)
+                  {
+                    value = new Bytes(longSerializer.serialize(TOPIC, Long.MIN_VALUE));
+                  }
+                  if (poisonPills)
+                  {
+                    value = new Bytes(stringSerializer.serialize(TOPIC, "BOOM (Poison-Pill)!"));
+                  }
+                }
 
-          void send(
-              Bytes key,
-              Bytes value,
-              boolean logicErrors,
-              Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
-          {
-            counter++;
+                ProducerRecord<Bytes, Bytes> record =
+                    new ProducerRecord<>(
+                        TOPIC,
+                        partition,
+                        new Bytes(stringSerializer.serialize(TOPIC,Integer.toString(partition*10+key%2))),
+                        value);
 
-            if (counter == 77)
-            {
-              if (logicErrors)
-              {
-                value = value.equals(startMessage) ? endMessage : startMessage;
+                messageSender.accept(record);
               }
             }
 
-            messageSender.accept(new ProducerRecord<>(TOPIC, key, value));
-          }
-
-          @Override
-          public boolean canGeneratePoisonPill()
-          {
-            return false;
+            return i;
           }
         });
   }
+
+
+  @TestConfiguration
+  public static class Configuration
+  {
+    @Primary
+    @Bean
+    public ApplicationRecordHandler recordHandler()
+    {
+      ApplicationRecordHandler recordHandler = new ApplicationRecordHandler();
+      return new ApplicationRecordHandler()
+      {
+        @Override
+        public void accept(ConsumerRecord<String, Long> record)
+        {
+          if (record.value() == Long.MIN_VALUE)
+            throw new RuntimeException("BOOM (Logic-Error)!");
+          super.accept(record);
+        }
+      };
+    }
+  }
 }
index 9a6f812..fa3d911 100644 (file)
@@ -37,9 +37,9 @@ import static org.awaitility.Awaitility.*;
 @SpringJUnitConfig(initializers = ConfigDataApplicationContextInitializer.class)
 @TestPropertySource(
                properties = {
-                               "sumup.adder.bootstrap-server=${spring.embedded.kafka.brokers}",
-                               "sumup.adder.topic=" + TOPIC,
-                               "sumup.adder.commit-interval=1s",
+                               "consumer.bootstrap-server=${spring.embedded.kafka.brokers}",
+                               "consumer.topic=" + TOPIC,
+                               "consumer.commit-interval=1s",
                                "spring.mongodb.embedded.version=4.4.13" })
 @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)
 @EnableAutoConfiguration