Einfacher geht es nicht mehr
authorKai Moritz <kai@juplo.de>
Sun, 18 Sep 2022 15:52:12 +0000 (17:52 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 18 Sep 2022 16:12:30 +0000 (18:12 +0200)
* Die `id` kann nur weggelassen werden, wenn `assign()` verwendet wird.

22 files changed:
README.sh
docker-compose.yml
pom.xml
src/main/java/de/juplo/kafka/AdderBusinessLogic.java [deleted file]
src/main/java/de/juplo/kafka/AdderResult.java [deleted file]
src/main/java/de/juplo/kafka/AdderResults.java [deleted file]
src/main/java/de/juplo/kafka/Application.java
src/main/java/de/juplo/kafka/ApplicationController.java [deleted file]
src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java [deleted file]
src/main/java/de/juplo/kafka/ApplicationProperties.java [deleted file]
src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java [deleted file]
src/main/java/de/juplo/kafka/ApplicationRecordHandler.java [deleted file]
src/main/java/de/juplo/kafka/Message.java [deleted file]
src/main/java/de/juplo/kafka/MessageAddNumber.java [deleted file]
src/main/java/de/juplo/kafka/MessageCalculateSum.java [deleted file]
src/main/java/de/juplo/kafka/StateDocument.java [deleted file]
src/main/java/de/juplo/kafka/StateRepository.java [deleted file]
src/main/resources/application.yml
src/test/java/de/juplo/kafka/AdderBusinessLogicTest.java [deleted file]
src/test/java/de/juplo/kafka/ApplicationIT.java [deleted file]
src/test/java/de/juplo/kafka/ApplicationTests.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/MessageTest.java [deleted file]

index a2d813d..69d05ee 100755 (executable)
--- a/README.sh
+++ b/README.sh
@@ -1,6 +1,6 @@
 #!/bin/bash
 
-IMAGE=juplo/sumup-adder-springified:1.0-SNAPSHOT
+IMAGE=juplo/supersimple:1.0-SNAPSHOT
 
 if [ "$1" = "cleanup" ]
 then
@@ -9,17 +9,16 @@ then
   exit
 fi
 
-docker-compose rm -svf adder-1 adder-2
-docker-compose rm -svf mongo
-docker-compose up -d zookeeper kafka-1 kafka-2 kafka-3 cli mongo express
+docker-compose rm -svf supersimple
+docker-compose up -d zookeeper kafka-1 kafka-2 kafka-3 cli
 
 if [[
   $(docker image ls -q $IMAGE) == "" ||
   "$1" = "build"
 ]]
 then
-  docker-compose rm -svf adder-1 adder-2
-  mvn -D skipTests clean install || exit
+  docker-compose rm -svf supersimple
+  mvn clean install || exit
 else
   echo "Using image existing images:"
   docker image ls $IMAGE
@@ -28,84 +27,14 @@ fi
 echo "Waiting for the Kafka-Cluster to become ready..."
 docker-compose exec cli cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1
 docker-compose up setup
-docker-compose up -d gateway requests-1 requests-2
+docker-compose up -d gateway requests-1 requests-2 supersimple
 
 while ! [[ $(http 0:8080/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for gateway..."; sleep 1; done
 while ! [[ $(http 0:8081/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for requests-1..."; sleep 1; done
 while ! [[ $(http 0:8082/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for requests-2..."; sleep 1; done
 
-docker-compose up -d peter klaus
+echo 6 | http -v :8080/peter
 
-docker-compose up -d adder-1
-while ! [[ $(http 0:8091/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for adder-1..."; sleep 1; done
-while [[ "$(http :8091/results | jq -r .)" == "{}" ]]; do echo "Waiting for some results to show up on adder-1..."; sleep 1; done
-http -v --pretty none -S :8091/results
-echo
+sleep 10
 
-sleep 3
-echo "Resultate für adder-1"
-http -v --pretty none -S :8091/results
-echo
-
-echo "Resultate für peter von adder-1"
-http :8091/results/peter | jq .[].sum | uniq
-echo "Resultate für klaus von adder-1"
-http :8091/results/klaus | jq .[].sum | uniq
-
-
-docker-compose up -d adder-2
-while ! [[ $(http 0:8092/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for adder-2..."; sleep 1; done
-while [[ "$(http :8092/results | jq -r .)" == "{}" ]]; do echo "Waiting for some results to show up on adder-2..."; sleep 1; done
-http -v --pretty none -S :8092/results
-echo
-
-sleep 3
-echo "Resultate für adder-2"
-http -v --pretty none -S :8092/results
-echo
-
-echo "Resultate für peter von adder-1"
-http :8091/results/peter | jq .[].sum | uniq
-echo "Resultate für klaus von adder-1"
-http :8091/results/klaus | jq .[].sum | uniq
-
-echo "Resultate für peter von adder-2"
-http :8092/results/peter | jq .[].sum | uniq
-echo "Resultate für klaus von adder-2"
-http :8092/results/klaus | jq .[].sum | uniq
-
-docker-compose stop adder-1
-until [ $(http --check-status :8092/results/peter 2> /dev/null) ]; do echo "Waiting for some results for peter to show up on adder-2..."; sleep 1; done
-until [ $(http --check-status :8092/results/klaus 2> /dev/null) ]; do echo "Waiting for some results for klaus to show up on adder-2..."; sleep 1; done
-
-echo "Resultate für adder-2"
-http -v --pretty none -S :8092/results
-echo
-
-echo "Resultate für peter von adder-2"
-http :8092/results/peter | jq .[].sum | uniq
-echo "Resultate für klaus von adder-2"
-http :8092/results/klaus | jq .[].sum | uniq
-
-docker-compose kill -s 9 adder-2
-docker-compose start adder-1
-docker-compose kill -s 9 peter klaus
-while ! [[ $(http 0:8091/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for adder-1..."; sleep 1; done
-until [ $(http --check-status :8091/results/peter 2> /dev/null) ]; do echo "Waiting for some results for peter to show up on adder-1..."; sleep 1; done
-until [ $(http --check-status :8091/results/klaus 2> /dev/null) ]; do echo "Waiting for some results for klaus to show up on adder-1..."; sleep 1; done
-
-echo "Resultate für adder-1"
-http -v --pretty none -S :8091/results
-echo
-
-echo "Resultate für peter von adder-1"
-http :8091/results/peter | jq .[].sum | uniq
-echo "Resultate für klaus von adder-1"
-http :8091/results/klaus | jq .[].sum | uniq
-
-sleep 5
-
-echo "Resultate für peter von adder-1"
-http :8091/results/peter | jq .[].sum | uniq
-echo "Resultate für klaus von adder-1"
-http :8091/results/klaus | jq .[].sum | uniq
+docker-compose logs supersimple
index a3da553..69e67a2 100644 (file)
@@ -60,38 +60,15 @@ services:
     depends_on:
       - zookeeper
 
-  mongo:
-    image: mongo:4.4.13
-    ports:
-      - 27017:27017
-    environment:
-      MONGO_INITDB_ROOT_USERNAME: juplo
-      MONGO_INITDB_ROOT_PASSWORD: training
-
-  express:
-    image: mongo-express
-    ports:
-      - 8090:8081
-    environment:
-      ME_CONFIG_MONGODB_ADMINUSERNAME: juplo
-      ME_CONFIG_MONGODB_ADMINPASSWORD: training
-      ME_CONFIG_MONGODB_URL: mongodb://juplo:training@mongo:27017/
-    depends_on:
-      - mongo
 
   setup:
     image: juplo/toolbox
     command: >
       bash -c "
-        kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic in
-        kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic out
-        kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic out.DLT
-        kafka-topics --bootstrap-server kafka:9092 --create --topic in --partitions 2 --replication-factor 3 --config min.insync.replicas=2
-        kafka-topics --bootstrap-server kafka:9092 --create --topic out --partitions 2 --replication-factor 3 --config min.insync.replicas=2  
-        kafka-topics --bootstrap-server kafka:9092 --create --topic out.DLT --partitions 2 --replication-factor 3 --config min.insync.replicas=2  
+        kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic in --partitions 2 --replication-factor 3 --config min.insync.replicas=2
+        kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic out --partitions 2 --replication-factor 3 --config min.insync.replicas=2  
         kafka-topics --bootstrap-server kafka:9092 --describe --topic in
         kafka-topics --bootstrap-server kafka:9092 --describe --topic out
-        kafka-topics --bootstrap-server kafka:9092 --describe --topic out.DLT
       "
 
   cli:
@@ -126,58 +103,13 @@ services:
       sumup.requests.bootstrap-server: kafka:9092
       sumup.requests.client-id: requests-2
 
-  adder-1:
-    image: juplo/sumup-adder-springified:1.0-SNAPSHOT
-    ports:
-      - 8091:8080
-    environment:
-      server.port: 8080
-      spring.kafka.bootstrap-servers: kafka:9092
-      spring.kafka.producer.bootstrap-servers: kafka:9092
-      spring.kafak.client-id: adder-1
-      spring.kafka.auto-commit-interval: 1s
-      sumup.adder.throttle: 3ms
-      spring.data.mongodb.uri: mongodb://juplo:training@mongo:27017
-      spring.data.mongodb.database: juplo
-      logging.level.org.apache.kafka.clients.consumer: INFO
-
-  adder-2:
-    image: juplo/sumup-adder-springified:1.0-SNAPSHOT
+  supersimple:
+    image: juplo/supersimple:1.0-SNAPSHOT
     ports:
-      - 8092:8080
+      - 8090:8080
     environment:
       server.port: 8080
       spring.kafka.bootstrap-servers: kafka:9092
       spring.kafka.producer.bootstrap-servers: kafka:9092
-      spring.kafak.client-id: adder-2
-      spring.kafka.auto-commit-interval: 1s
-      sumup.adder.throttle: 3ms
-      spring.data.mongodb.uri: mongodb://juplo:training@mongo:27017
-      spring.data.mongodb.database: juplo
+      spring.kafak.client-id: supersimple
       logging.level.org.apache.kafka.clients.consumer: INFO
-
-  peter:
-    image: juplo/toolbox
-    command: >
-      bash -c "
-      while [[ true ]];
-      do
-        echo 666 | http -v gateway:8080/peter;
-        sleep 1;
-      done
-      "
-  klaus:
-    image: juplo/toolbox
-    command: >
-      bash -c "
-      while [[ true ]];
-      do
-        echo 666 | http -v gateway:8080/klaus;
-        sleep 1;
-      done
-      "
-
-  dlt:
-    image: juplo/toolbox
-    tty: true
-    command: kafkacat -C -b kafka:9092 -t out.DLT -f'p=%p|o=%o|%k=%s\n' -o 0 -q
diff --git a/pom.xml b/pom.xml
index a252d1c..9fa4884 100644 (file)
--- a/pom.xml
+++ b/pom.xml
   </parent>
 
   <groupId>de.juplo.kafka</groupId>
-  <artifactId>sumup-adder-springified</artifactId>
+  <artifactId>supersimple</artifactId>
   <version>1.0-SNAPSHOT</version>
-  <name>SumUp Adder</name>
-  <description>Calculates the sum for the send messages. This version consumes JSON-messages.</description>
+  <name>Supersimple Consumer-Group</name>
+  <description>Most minimal Consumer-Group ever!</description>
 
   <properties>
     <java.version>11</java.version>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-web</artifactId>
     </dependency>
-    <dependency>
-      <groupId>org.springframework.boot</groupId>
-      <artifactId>spring-boot-starter-data-mongodb</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.springframework.boot</groupId>
-      <artifactId>spring-boot-starter-validation</artifactId>
-    </dependency>
     <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-actuator</artifactId>
       <artifactId>awaitility</artifactId>
       <scope>test</scope>
     </dependency>
-    <dependency>
-      <groupId>de.flapdoodle.embed</groupId>
-      <artifactId>de.flapdoodle.embed.mongo</artifactId>
-      <scope>test</scope>
-    </dependency>
     <dependency>
       <groupId>org.assertj</groupId>
       <artifactId>assertj-core</artifactId>
diff --git a/src/main/java/de/juplo/kafka/AdderBusinessLogic.java b/src/main/java/de/juplo/kafka/AdderBusinessLogic.java
deleted file mode 100644 (file)
index d525182..0000000
+++ /dev/null
@@ -1,55 +0,0 @@
-package de.juplo.kafka;
-
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-
-
-public class AdderBusinessLogic
-{
-  private final Map<String, AdderResult> state;
-
-
-  public AdderBusinessLogic()
-  {
-    this(new HashMap<>());
-  }
-
-  public AdderBusinessLogic(Map<String, AdderResult> state)
-  {
-    this.state = state;
-  }
-
-
-  public synchronized Optional<Long> getSum(String user)
-  {
-    return Optional.ofNullable(state.get(user)).map(result -> result.sum);
-  }
-
-  public synchronized void addToSum(String user, Integer value)
-  {
-    if (value == null || value < 1)
-      throw new IllegalArgumentException("Not a positive number: " + value);
-
-    long sum =
-        Optional
-            .ofNullable(state.get(user))
-            .map(result -> result.sum)
-            .orElse(0l);
-    state.put(user, new AdderResult(value, sum + value));
-  }
-
-  public synchronized AdderResult calculate(String user)
-  {
-    if (!state.containsKey(user))
-      throw new IllegalStateException("No sumation for " + user + " in progress");
-
-    return state.remove(user);
-  }
-
-  protected Map<String, AdderResult> getState()
-  {
-    return state;
-  }
-}
diff --git a/src/main/java/de/juplo/kafka/AdderResult.java b/src/main/java/de/juplo/kafka/AdderResult.java
deleted file mode 100644 (file)
index 44b7da8..0000000
+++ /dev/null
@@ -1,21 +0,0 @@
-package de.juplo.kafka;
-
-import lombok.EqualsAndHashCode;
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-
-
-@RequiredArgsConstructor
-@Getter
-@EqualsAndHashCode
-public class AdderResult
-{
-  final int number;
-  final long sum;
-
-  @Override
-  public String toString()
-  {
-    return "sum(" + number + ") = " + sum;
-  }
-}
diff --git a/src/main/java/de/juplo/kafka/AdderResults.java b/src/main/java/de/juplo/kafka/AdderResults.java
deleted file mode 100644 (file)
index e7f5602..0000000
+++ /dev/null
@@ -1,47 +0,0 @@
-package de.juplo.kafka;
-
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-
-public class AdderResults
-{
-  private final Map<Integer, Map<String, List<AdderResult>>> results = new HashMap<>();
-
-
-  public void addResults(Integer partition, String user, AdderResult result)
-  {
-    Map<String, List<AdderResult>> resultsByUser = this.results.get(partition);
-
-    List<AdderResult> results = resultsByUser.get(user);
-    if (results == null)
-    {
-      results = new LinkedList<>();
-      resultsByUser.put(user, results);
-    }
-
-    results.add(result);
-  }
-
-  protected void addPartition(Integer partition, Map<String, List<AdderResult>> results)
-  {
-    this.results.put(partition, results);
-  }
-
-  protected Map<String, List<AdderResult>> removePartition(Integer partition)
-  {
-    return this.results.remove(partition);
-  }
-
-  public Map<Integer, Map<String, List<AdderResult>>> getState()
-  {
-    return results;
-  }
-
-  public Map<String, List<AdderResult>> getState(Integer partition)
-  {
-    return results.get(partition);
-  }
-}
index 69a9712..0f9ea12 100644 (file)
 package de.juplo.kafka;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.kafka.common.serialization.StringSerializer;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.Bean;
 import org.springframework.kafka.annotation.EnableKafka;
-import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
-import org.springframework.kafka.core.DefaultKafkaProducerFactory;
-import org.springframework.kafka.core.KafkaOperations;
-import org.springframework.kafka.core.KafkaTemplate;
-import org.springframework.kafka.core.ProducerFactory;
-import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
-import org.springframework.kafka.listener.DefaultErrorHandler;
-import org.springframework.kafka.support.serializer.DelegatingByTypeSerializer;
-import org.springframework.kafka.support.serializer.JsonSerializer;
-import org.springframework.util.backoff.FixedBackOff;
-
-import java.util.Map;
-import java.util.Optional;
-
+import org.springframework.kafka.annotation.KafkaListener;
 
 @SpringBootApplication
-@Slf4j
-@EnableConfigurationProperties({ KafkaProperties.class, ApplicationProperties.class })
 @EnableKafka
+@Slf4j
 public class Application
 {
-  @Bean
-  public ApplicationRecordHandler applicationRecordHandler(
-      AdderResults adderResults,
-      KafkaProperties kafkaProperties,
-      ApplicationProperties applicationProperties)
-  {
-    return new ApplicationRecordHandler(
-        adderResults,
-        Optional.ofNullable(applicationProperties.getThrottle()),
-        kafkaProperties.getConsumer().getGroupId());
-  }
-
-  @Bean
-  public AdderResults adderResults()
-  {
-    return new AdderResults();
-  }
-
-  @Bean
-  public ApplicationRebalanceListener rebalanceListener(
-      ApplicationRecordHandler recordHandler,
-      AdderResults adderResults,
-      StateRepository stateRepository,
-      KafkaProperties kafkaProperties)
-  {
-    return new ApplicationRebalanceListener(
-        recordHandler,
-        adderResults,
-        stateRepository,
-        kafkaProperties.getConsumer().getGroupId());
-  }
-
-  @Bean
-  ApplicationHealthIndicator applicationHealthIndicator(
-      KafkaListenerEndpointRegistry registry,
-      KafkaProperties properties)
+  @KafkaListener(id = "supersimple", topics = "out")
+  public void recieve(String message)
   {
-    return new ApplicationHealthIndicator(
-        properties.getConsumer().getGroupId(),
-        registry);
+    log.info("Recieved message: {}", message);
   }
 
-  @Bean
-  public ProducerFactory<String, Object> producerFactory(
-      KafkaProperties properties)
-  {
-    return new DefaultKafkaProducerFactory<>(
-        properties.getProducer().buildProperties(),
-        new StringSerializer(),
-        new DelegatingByTypeSerializer(
-            Map.of(
-                byte[].class, new ByteArraySerializer(),
-                MessageAddNumber.class, new JsonSerializer<>(),
-                MessageCalculateSum.class, new JsonSerializer<>())));
-  }
-
-  @Bean
-  public KafkaTemplate<String, Object> kafkaTemplate(
-      ProducerFactory<String, Object> producerFactory)
-  {
-    return new KafkaTemplate<>(producerFactory);
-  }
-
-  @Bean
-  public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(
-      KafkaOperations<?, ?> kafkaTemplate)
-  {
-    return new DeadLetterPublishingRecoverer(kafkaTemplate);
-  }
-
-  @Bean
-  public DefaultErrorHandler errorHandler(
-      DeadLetterPublishingRecoverer recoverer)
-  {
-    return new DefaultErrorHandler(
-        recoverer,
-        new FixedBackOff(0l, 0l));
-  }
-
-
   public static void main(String[] args)
   {
     SpringApplication.run(Application.class, args);
diff --git a/src/main/java/de/juplo/kafka/ApplicationController.java b/src/main/java/de/juplo/kafka/ApplicationController.java
deleted file mode 100644 (file)
index 0a9890c..0000000
+++ /dev/null
@@ -1,39 +0,0 @@
-package de.juplo.kafka;
-
-import lombok.RequiredArgsConstructor;
-import org.springframework.data.mongodb.core.aggregation.ArithmeticOperators;
-import org.springframework.http.ResponseEntity;
-import org.springframework.web.bind.annotation.*;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.stream.Collectors;
-
-
-@RestController
-@RequiredArgsConstructor
-public class ApplicationController
-{
-  private final AdderResults results;
-
-
-  @GetMapping("results")
-  public Map<Integer, Map<String, List<AdderResult>>> results()
-  {
-    return results.getState();
-  }
-
-  @GetMapping("results/{user}")
-  public ResponseEntity<List<AdderResult>> results(@PathVariable String user)
-  {
-    for (Map<String, List<AdderResult>> resultsByUser : this.results.getState().values())
-    {
-      List<AdderResult> results = resultsByUser.get(user);
-      if (results != null)
-        return ResponseEntity.ok(results);
-    }
-
-    return ResponseEntity.notFound().build();
-  }
-}
diff --git a/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java b/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java
deleted file mode 100644 (file)
index 0466df4..0000000
+++ /dev/null
@@ -1,23 +0,0 @@
-package de.juplo.kafka;
-
-import lombok.RequiredArgsConstructor;
-import org.springframework.boot.actuate.health.Health;
-import org.springframework.boot.actuate.health.HealthIndicator;
-import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
-
-
-@RequiredArgsConstructor
-public class ApplicationHealthIndicator implements HealthIndicator
-{
-  private final String id;
-  private final KafkaListenerEndpointRegistry registry;
-
-
-  @Override
-  public Health health()
-  {
-    return registry.getListenerContainer(id).isRunning()
-        ? Health.up().build()
-        : Health.down().build();
-  }
-}
diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java
deleted file mode 100644 (file)
index 005460c..0000000
+++ /dev/null
@@ -1,23 +0,0 @@
-package de.juplo.kafka;
-
-import lombok.Getter;
-import lombok.Setter;
-import org.springframework.boot.context.properties.ConfigurationProperties;
-import org.springframework.validation.annotation.Validated;
-
-import javax.validation.constraints.NotEmpty;
-import javax.validation.constraints.NotNull;
-import java.time.Duration;
-
-
-@ConfigurationProperties(prefix = "sumup.adder")
-@Validated
-@Getter
-@Setter
-public class ApplicationProperties
-{
-  @NotNull
-  @NotEmpty
-  private String topic;
-  private Duration throttle;
-}
diff --git a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java
deleted file mode 100644 (file)
index ba15227..0000000
+++ /dev/null
@@ -1,71 +0,0 @@
-package de.juplo.kafka;
-
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.common.TopicPartition;
-import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
-
-import java.util.*;
-
-
-@RequiredArgsConstructor
-@Slf4j
-public class ApplicationRebalanceListener implements ConsumerAwareRebalanceListener
-{
-  private final ApplicationRecordHandler recordHandler;
-  private final AdderResults adderResults;
-  private final StateRepository stateRepository;
-  private final String id;
-
-  private final Set<Integer> partitions = new HashSet<>();
-
-  @Override
-  public void onPartitionsAssigned(Collection<TopicPartition> partitions)
-  {
-    partitions.forEach(tp ->
-    {
-      Integer partition = tp.partition();
-      log.info("{} - adding partition: {}", id, partition);
-      this.partitions.add(partition);
-      StateDocument document =
-          stateRepository
-              .findById(Integer.toString(partition))
-              .orElse(new StateDocument(partition));
-      recordHandler.addPartition(partition, document.state);
-      for (String user : document.state.keySet())
-      {
-        log.info(
-            "{} - Restored state for partition={}|user={}: {}",
-            id,
-            partition,
-            user,
-            document.state.get(user));
-      }
-      adderResults.addPartition(partition, document.results);
-    });
-  }
-
-  @Override
-  public void onPartitionsRevoked(Collection<TopicPartition> partitions)
-  {
-    partitions.forEach(tp ->
-    {
-      Integer partition = tp.partition();
-      log.info("{} - removing partition: {}", id, partition);
-      this.partitions.remove(partition);
-      Map<String, AdderResult> state = recordHandler.removePartition(partition);
-      for (String user : state.keySet())
-      {
-        log.info(
-            "{} - Saved state for partition={}|user={}: {}",
-            id,
-            partition,
-            user,
-            state.get(user));
-      }
-      Map<String, List<AdderResult>> results = adderResults.removePartition(partition);
-      stateRepository.save(new StateDocument(partition, state, results));
-    });
-  }
-}
diff --git a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java
deleted file mode 100644 (file)
index 2075781..0000000
+++ /dev/null
@@ -1,84 +0,0 @@
-package de.juplo.kafka;
-
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.kafka.annotation.KafkaHandler;
-import org.springframework.kafka.annotation.KafkaListener;
-import org.springframework.kafka.support.KafkaHeaders;
-import org.springframework.messaging.handler.annotation.Header;
-import org.springframework.messaging.handler.annotation.Payload;
-
-import java.time.Duration;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-
-
-@RequiredArgsConstructor
-@Slf4j
-@KafkaListener(
-    id = "${spring.kafka.consumer.group-id}",
-    topics = "${sumup.adder.topic}")
-public class ApplicationRecordHandler
-{
-  private final AdderResults results;
-  private final Optional<Duration> throttle;
-  private final String id;
-
-  private final Map<Integer, AdderBusinessLogic> state = new HashMap<>();
-
-
-  @KafkaHandler
-  public void addNumber(
-      @Header(KafkaHeaders.RECEIVED_PARTITION_ID)
-      Integer partition,
-      @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY)
-      String user,
-      @Payload
-      MessageAddNumber message)
-  {
-    log.debug("{} - Received {} for {} on {}", id, message, user, partition);
-    state.get(partition).addToSum(user, message.getNext());
-    throttle();
-  }
-
-  @KafkaHandler
-  public void calculateSum(
-      @Header(KafkaHeaders.RECEIVED_PARTITION_ID)
-      Integer partition,
-      @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY)
-      String user,
-      @Payload
-      MessageCalculateSum message)
-  {
-    AdderResult result = state.get(partition).calculate(user);
-    log.info("{} - New result for {}: {}", id, user, result);
-    results.addResults(partition, user, result);
-    throttle();
-  }
-
-  private void throttle()
-  {
-    if (throttle.isPresent())
-    {
-      try
-      {
-        Thread.sleep(throttle.get().toMillis());
-      }
-      catch (InterruptedException e)
-      {
-        log.warn("{} - Intrerrupted while throttling: {}", id, e);
-      }
-    }
-  }
-
-  protected void addPartition(Integer partition, Map<String, AdderResult> state)
-  {
-    this.state.put(partition, new AdderBusinessLogic(state));
-  }
-
-  protected Map<String, AdderResult> removePartition(Integer partition)
-  {
-    return this.state.remove(partition).getState();
-  }
-}
diff --git a/src/main/java/de/juplo/kafka/Message.java b/src/main/java/de/juplo/kafka/Message.java
deleted file mode 100644 (file)
index e4999b7..0000000
+++ /dev/null
@@ -1,9 +0,0 @@
-package de.juplo.kafka;
-
-
-public abstract class Message
-{
-  public enum Type {ADD, CALC}
-
-  public abstract Type getType();
-}
diff --git a/src/main/java/de/juplo/kafka/MessageAddNumber.java b/src/main/java/de/juplo/kafka/MessageAddNumber.java
deleted file mode 100644 (file)
index c024b65..0000000
+++ /dev/null
@@ -1,19 +0,0 @@
-package de.juplo.kafka;
-
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import lombok.Data;
-
-
-@Data
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class MessageAddNumber extends Message
-{
-  private Integer next;
-
-
-  @Override
-  public Type getType()
-  {
-    return Type.ADD;
-  }
-}
diff --git a/src/main/java/de/juplo/kafka/MessageCalculateSum.java b/src/main/java/de/juplo/kafka/MessageCalculateSum.java
deleted file mode 100644 (file)
index afc5a39..0000000
+++ /dev/null
@@ -1,16 +0,0 @@
-package de.juplo.kafka;
-
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import lombok.Data;
-
-
-@Data
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class MessageCalculateSum extends Message
-{
-  @Override
-  public Type getType()
-  {
-    return Type.CALC;
-  }
-}
diff --git a/src/main/java/de/juplo/kafka/StateDocument.java b/src/main/java/de/juplo/kafka/StateDocument.java
deleted file mode 100644 (file)
index ae8eb51..0000000
+++ /dev/null
@@ -1,41 +0,0 @@
-package de.juplo.kafka;
-
-import lombok.ToString;
-import org.springframework.data.annotation.Id;
-import org.springframework.data.mongodb.core.mapping.Document;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-
-@Document(collection = "state")
-@ToString
-public class StateDocument
-{
-  @Id
-  public String id;
-  public Map<String, AdderResult> state;
-  public Map<String, List<AdderResult>> results;
-
-  public StateDocument()
-  {
-  }
-
-  public StateDocument(Integer partition)
-  {
-    this.id = Integer.toString(partition);
-    this.state = new HashMap<>();
-    this.results = new HashMap<>();
-  }
-
-  public StateDocument(
-      Integer partition,
-      Map<String, AdderResult> state,
-      Map<String, List<AdderResult>> results)
-  {
-    this.id = Integer.toString(partition);
-    this.state = state;
-    this.results = results;
-  }
-}
diff --git a/src/main/java/de/juplo/kafka/StateRepository.java b/src/main/java/de/juplo/kafka/StateRepository.java
deleted file mode 100644 (file)
index 3129535..0000000
+++ /dev/null
@@ -1,11 +0,0 @@
-package de.juplo.kafka;
-
-import org.springframework.data.mongodb.repository.MongoRepository;
-
-import java.util.Optional;
-
-
-public interface StateRepository extends MongoRepository<StateDocument, String>
-{
-  public Optional<StateDocument> findById(String partition);
-}
index a95e976..ee1bb64 100644 (file)
@@ -1,6 +1,3 @@
-sumup:
-  adder:
-    topic: out
 management:
   endpoint:
     shutdown:
@@ -15,25 +12,8 @@ management:
     java:
       enabled: true
 spring:
-  data:
-    mongodb:
-      uri: mongodb://juplo:training@localhost:27017
-      database: juplo
   kafka:
     bootstrap-servers: :9092
-    consumer:
-      group-id: my-group
-      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
-      value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
-      properties:
-        partition.assignment.strategy: org.apache.kafka.clients.consumer.StickyAssignor
-        metadata.max.age.ms: 1000
-        spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
-        spring.json.type.mapping: >
-          ADD:de.juplo.kafka.MessageAddNumber,
-          CALC:de.juplo.kafka.MessageCalculateSum
-    producer:
-      bootstrap-servers: :9092
 logging:
   level:
     root: INFO
diff --git a/src/test/java/de/juplo/kafka/AdderBusinessLogicTest.java b/src/test/java/de/juplo/kafka/AdderBusinessLogicTest.java
deleted file mode 100644 (file)
index 8e49263..0000000
+++ /dev/null
@@ -1,117 +0,0 @@
-package de.juplo.kafka;
-
-import org.junit.jupiter.api.DisplayName;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.Arguments;
-import org.junit.jupiter.params.provider.MethodSource;
-import org.junit.jupiter.params.provider.ValueSource;
-
-import java.util.Arrays;
-import java.util.stream.IntStream;
-import java.util.stream.Stream;
-
-import static org.assertj.core.api.Assertions.*;
-
-
-public class AdderBusinessLogicTest
-{
-  @Test
-  @DisplayName("An empty Optional should be returned, for a non-existing sum")
-  public void testGetSumReturnsEmptyOptionalForNonExistingSum()
-  {
-    AdderBusinessLogic adder = new AdderBusinessLogic();
-    assertThat(adder.getSum("foo")).isEmpty();
-  }
-
-  @Test
-  @DisplayName("A non-empty Optional should be returned, for an existing sum")
-  public void testGetSumReturnsNonEmptyOptionalForExistingSum()
-  {
-    AdderBusinessLogic adder = new AdderBusinessLogic();
-    adder.addToSum("foo", 6);
-    assertThat(adder.getSum("foo")).isNotEmpty();
-  }
-
-  @Test
-  @DisplayName("A sum can be calculated, if it does exist")
-  public void testCalculatePossibleIfSumExists()
-  {
-    AdderBusinessLogic adder = new AdderBusinessLogic();
-    adder.addToSum("foo", 6);
-    assertThatNoException().isThrownBy(() -> adder.calculate("foo"));
-  }
-
-  @Test
-  @DisplayName("An existing sum is removed, if ended")
-  public void testCalculateRemovesSumIfSumExists()
-  {
-    AdderBusinessLogic adder = new AdderBusinessLogic();
-    adder.addToSum("foo", 6);
-    adder.calculate("foo");
-    assertThat(adder.getSum("foo")).isEmpty();
-  }
-
-  @Test
-  @DisplayName("An existing sum returns a non-null value, if calculated")
-  public void testCalculateReturnsNonNullValueIfSumExists()
-  {
-    AdderBusinessLogic adder = new AdderBusinessLogic();
-    adder.addToSum("foo", 6);
-    assertThat(adder.calculate("foo")).isNotNull();
-  }
-
-  @Test
-  @DisplayName("Ending a non-existing sum, causes an IllegalStateException")
-  public void testCalculateCausesExceptionIfNotExists()
-  {
-    AdderBusinessLogic adder = new AdderBusinessLogic();
-    assertThatIllegalStateException().isThrownBy(() -> adder.calculate("foo"));
-  }
-
-  @Test
-  @DisplayName("Adding a null-value to a sum causes an IllegalArgumentException")
-  public void testAddToSumWithNullValueCausesException()
-  {
-    AdderBusinessLogic adder = new AdderBusinessLogic();
-    assertThatIllegalArgumentException().isThrownBy(() -> adder.addToSum("foo", null));
-  }
-
-  @ParameterizedTest(name = "{index}: Adding {0}")
-  @DisplayName("Adding a non-positive value to a sum causes an IllegalArgumentException")
-  @ValueSource(ints = { 0, -1, -6, -66, Integer.MIN_VALUE })
-  public void testAddToSumWithNonPositiveValueCausesException(int value)
-  {
-    AdderBusinessLogic adder = new AdderBusinessLogic();
-    assertThatIllegalArgumentException().isThrownBy(() -> adder.addToSum("foo", value));
-  }
-
-  @ParameterizedTest(name = "{index}: Adding {0}")
-  @DisplayName("Can add a positive value to a sum")
-  @ValueSource(ints = { 1, 3, 6, 66, 7, 9 })
-  public void testAddToSumWithPositiveValuePossible(int value)
-  {
-    AdderBusinessLogic adder = new AdderBusinessLogic();
-    assertThatNoException().isThrownBy(() -> adder.addToSum("foo", value));
-  }
-
-  @ParameterizedTest(name = "{index}: Summing up {0}")
-  @DisplayName("Adds up numbers correctly")
-  @MethodSource("numbersProvider")
-  public void testAddToSumAddsUpNumbersCorrectlyIfSumExists(int... numbers)
-  {
-    long expectedResult = Arrays.stream(numbers).sum();
-    AdderBusinessLogic adder = new AdderBusinessLogic();
-    Arrays.stream(numbers).forEach(number -> adder.addToSum("foo", number));
-    AdderResult result = adder.calculate("foo");
-    assertThat(result.number).isEqualTo(numbers[numbers.length-1]);
-    assertThat(result.sum).isEqualTo(expectedResult);
-  }
-
-  static Stream<Arguments> numbersProvider() {
-    return Stream.of(
-        Arguments.of((Object) IntStream.rangeClosed(1,9).toArray()),
-        Arguments.of((Object) IntStream.rangeClosed(1,19).toArray()),
-        Arguments.of((Object) IntStream.rangeClosed(1,66).toArray()));
-  }
-}
diff --git a/src/test/java/de/juplo/kafka/ApplicationIT.java b/src/test/java/de/juplo/kafka/ApplicationIT.java
deleted file mode 100644 (file)
index 4bb4f5b..0000000
+++ /dev/null
@@ -1,43 +0,0 @@
-package de.juplo.kafka;
-
-import org.junit.jupiter.api.Test;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo;
-import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.boot.test.web.client.TestRestTemplate;
-import org.springframework.boot.test.web.server.LocalServerPort;
-import org.springframework.kafka.test.context.EmbeddedKafka;
-
-import static de.juplo.kafka.ApplicationIT.TOPIC;
-
-
-@SpringBootTest(
-    webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
-    properties = {
-        "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
-        "sumup.adder.topic=" + TOPIC,
-        "spring.mongodb.embedded.version=4.4.13" })
-@EmbeddedKafka(topics = TOPIC)
-@AutoConfigureDataMongo
-public class ApplicationIT
-{
-  public static final String TOPIC = "FOO";
-
-  @LocalServerPort
-  private int port;
-
-  @Autowired
-  private TestRestTemplate restTemplate;
-
-
-
-  @Test
-  public void testApplicationStartup()
-  {
-    restTemplate.getForObject(
-        "http://localhost:" + port + "/actuator/health",
-        String.class
-        )
-        .contains("UP");
-  }
-}
diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java
new file mode 100644 (file)
index 0000000..3c77a44
--- /dev/null
@@ -0,0 +1,34 @@
+package de.juplo.kafka;
+
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.web.client.TestRestTemplate;
+import org.springframework.boot.test.web.server.LocalServerPort;
+import org.springframework.kafka.test.context.EmbeddedKafka;
+
+
+@SpringBootTest(
+    webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
+    properties = "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}")
+@EmbeddedKafka(topics = "out")
+public class ApplicationTests
+{
+  @LocalServerPort
+  private int port;
+
+  @Autowired
+  private TestRestTemplate restTemplate;
+
+
+
+  @Test
+  public void testApplicationStartup()
+  {
+    restTemplate.getForObject(
+        "http://localhost:" + port + "/actuator/health",
+        String.class
+        )
+        .contains("UP");
+  }
+}
diff --git a/src/test/java/de/juplo/kafka/MessageTest.java b/src/test/java/de/juplo/kafka/MessageTest.java
deleted file mode 100644 (file)
index 52794ba..0000000
+++ /dev/null
@@ -1,39 +0,0 @@
-package de.juplo.kafka;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.DisplayName;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.Arguments;
-import org.junit.jupiter.params.provider.MethodSource;
-import org.junit.jupiter.params.provider.ValueSource;
-
-import java.util.Arrays;
-import java.util.stream.IntStream;
-import java.util.stream.Stream;
-
-import static org.assertj.core.api.Assertions.*;
-
-
-public class MessageTest
-{
-  ObjectMapper mapper = new ObjectMapper();
-
-  @Test
-  @DisplayName("Deserialize a MessageAddNumber message")
-  public void testDeserializeMessageAddNumber()
-  {
-    Assertions.assertDoesNotThrow(() -> mapper.readValue("{\"next\":42}", MessageAddNumber.class));
-    Assertions.assertDoesNotThrow(() -> mapper.readValue("{\"number\":666,\"next\":42}", MessageAddNumber.class));
-  }
-
-  @Test
-  @DisplayName("Deserialize a MessageCalculateSum message")
-  public void testDeserializeMessageCalculateSum() throws JsonProcessingException
-  {
-    Assertions.assertDoesNotThrow(() -> mapper.readValue("{}", MessageCalculateSum.class));
-    Assertions.assertDoesNotThrow(() -> mapper.readValue("{\"number\":666}", MessageCalculateSum.class));
-  }
-}