Konfig-Parameter zum künstlichen Verzögern der Verabeitung eingebaut
authorKai Moritz <kai@juplo.de>
Mon, 22 Aug 2022 16:24:11 +0000 (18:24 +0200)
committerKai Moritz <kai@juplo.de>
Fri, 26 Aug 2022 10:52:44 +0000 (12:52 +0200)
docker-compose.yml
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/ApplicationProperties.java
src/main/java/de/juplo/kafka/ApplicationRecordHandler.java

index 96fda60..c46b00d 100644 (file)
@@ -132,6 +132,7 @@ services:
       sumup.adder.bootstrap-server: kafka:9092
       sumup.adder.client-id: adder-1
       sumup.adder.commit-interval: 3s
+      sumup.adder.throttle: 3ms
       spring.data.mongodb.uri: mongodb://juplo:training@mongo:27017
       spring.data.mongodb.database: juplo
       logging.level.org.apache.kafka.clients.consumer: DEBUG
@@ -145,6 +146,7 @@ services:
       sumup.adder.bootstrap-server: kafka:9092
       sumup.adder.client-id: adder-2
       sumup.adder.commit-interval: 3s
+      sumup.adder.throttle: 3ms
       spring.data.mongodb.uri: mongodb://juplo:training@mongo:27017
       spring.data.mongodb.database: juplo
       logging.level.org.apache.kafka.clients.consumer: DEBUG
@@ -156,6 +158,7 @@ services:
       while [[ true ]];
       do
         echo 666 | http -v gateway:8080/peter;
+        sleep 1;
       done
       "
   klaus:
@@ -165,5 +168,6 @@ services:
       while [[ true ]];
       do
         echo 666 | http -v gateway:8080/klaus;
+        sleep 1;
       done
       "
index c1bc019..4d056c4 100644 (file)
@@ -7,6 +7,7 @@ import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
 import java.time.Clock;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -17,9 +18,13 @@ import java.util.concurrent.Executors;
 public class ApplicationConfiguration
 {
   @Bean
-  public ApplicationRecordHandler recordHandler(AdderResults adderResults)
+  public ApplicationRecordHandler recordHandler(
+      AdderResults adderResults,
+      ApplicationProperties properties)
   {
-    return new ApplicationRecordHandler(adderResults);
+    return new ApplicationRecordHandler(
+        adderResults,
+        Optional.ofNullable(properties.getThrottle()));
   }
 
   @Bean
index 410c623..f852c00 100644 (file)
@@ -33,4 +33,5 @@ public class ApplicationProperties
   private String autoOffsetReset;
   @NotNull
   private Duration commitInterval;
+  private Duration throttle;
 }
index 596f3da..0f5b982 100644 (file)
@@ -4,8 +4,10 @@ import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 
+import java.time.Duration;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 
 
 @RequiredArgsConstructor
@@ -13,6 +15,7 @@ import java.util.Map;
 public class ApplicationRecordHandler implements RecordHandler<String, String>
 {
   private final AdderResults results;
+  private final Optional<Duration> throttle;
 
   private final Map<Integer, AdderBusinessLogic> state = new HashMap<>();
 
@@ -29,10 +32,23 @@ public class ApplicationRecordHandler implements RecordHandler<String, String>
       AdderResult result = state.get(partition).calculate(user);
       log.info("New result for {}: {}", user, result);
       results.addResults(partition, user, result);
-      return;
+    }
+    else
+    {
+      state.get(partition).addToSum(user, Integer.parseInt(message));
     }
 
-    state.get(partition).addToSum(user, Integer.parseInt(message));
+    if (throttle.isPresent())
+    {
+      try
+      {
+        Thread.sleep(throttle.get().toMillis());
+      }
+      catch (InterruptedException e)
+      {
+        log.warn("Intrerrupted while throttling: {}", e);
+      }
+    }
   }
 
   protected void addPartition(Integer partition, Map<String, AdderResult> state)