Throttling aus dem `ApplicationRecordHandler` entfernt
authorKai Moritz <kai@juplo.de>
Fri, 16 Sep 2022 10:10:38 +0000 (12:10 +0200)
committerKai Moritz <kai@juplo.de>
Fri, 16 Sep 2022 16:09:23 +0000 (18:09 +0200)
* Den Mechanismus zum künstlichen Ausbremsen der Verarbeitung und die
  zugehörige Konfigurations-Option aus dem `ApplicationRecordHandler`
  entfernt, um zu einer möglichst einfachen Anfangs-Lösung zu gelangen.

src/main/java/de/juplo/kafka/Application.java
src/main/java/de/juplo/kafka/ApplicationProperties.java
src/main/java/de/juplo/kafka/ApplicationRecordHandler.java

index 69a9712..d61009e 100644 (file)
@@ -21,7 +21,6 @@ import org.springframework.kafka.support.serializer.JsonSerializer;
 import org.springframework.util.backoff.FixedBackOff;
 
 import java.util.Map;
-import java.util.Optional;
 
 
 @SpringBootApplication
@@ -33,12 +32,10 @@ public class Application
   @Bean
   public ApplicationRecordHandler applicationRecordHandler(
       AdderResults adderResults,
-      KafkaProperties kafkaProperties,
-      ApplicationProperties applicationProperties)
+      KafkaProperties kafkaProperties)
   {
     return new ApplicationRecordHandler(
         adderResults,
-        Optional.ofNullable(applicationProperties.getThrottle()),
         kafkaProperties.getConsumer().getGroupId());
   }
 
index 005460c..940ed5b 100644 (file)
@@ -7,7 +7,6 @@ import org.springframework.validation.annotation.Validated;
 
 import javax.validation.constraints.NotEmpty;
 import javax.validation.constraints.NotNull;
-import java.time.Duration;
 
 
 @ConfigurationProperties(prefix = "sumup.adder")
@@ -19,5 +18,4 @@ public class ApplicationProperties
   @NotNull
   @NotEmpty
   private String topic;
-  private Duration throttle;
 }
index 2075781..f082314 100644 (file)
@@ -8,10 +8,8 @@ import org.springframework.kafka.support.KafkaHeaders;
 import org.springframework.messaging.handler.annotation.Header;
 import org.springframework.messaging.handler.annotation.Payload;
 
-import java.time.Duration;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Optional;
 
 
 @RequiredArgsConstructor
@@ -22,7 +20,6 @@ import java.util.Optional;
 public class ApplicationRecordHandler
 {
   private final AdderResults results;
-  private final Optional<Duration> throttle;
   private final String id;
 
   private final Map<Integer, AdderBusinessLogic> state = new HashMap<>();
@@ -39,7 +36,6 @@ public class ApplicationRecordHandler
   {
     log.debug("{} - Received {} for {} on {}", id, message, user, partition);
     state.get(partition).addToSum(user, message.getNext());
-    throttle();
   }
 
   @KafkaHandler
@@ -54,23 +50,8 @@ public class ApplicationRecordHandler
     AdderResult result = state.get(partition).calculate(user);
     log.info("{} - New result for {}: {}", id, user, result);
     results.addResults(partition, user, result);
-    throttle();
   }
 
-  private void throttle()
-  {
-    if (throttle.isPresent())
-    {
-      try
-      {
-        Thread.sleep(throttle.get().toMillis());
-      }
-      catch (InterruptedException e)
-      {
-        log.warn("{} - Intrerrupted while throttling: {}", id, e);
-      }
-    }
-  }
 
   protected void addPartition(Integer partition, Map<String, AdderResult> state)
   {