]> juplo.de Git - demos/kafka/training/commitdiff
Throttling aus dem `ApplicationRecordHandler` entfernt
authorKai Moritz <kai@juplo.de>
Fri, 16 Sep 2022 10:10:38 +0000 (12:10 +0200)
committerKai Moritz <kai@juplo.de>
Fri, 16 Sep 2022 16:09:23 +0000 (18:09 +0200)
* Den Mechanismus zum künstlichen Ausbremsen der Verarbeitung und die
  zugehörige Konfigurations-Option aus dem `ApplicationRecordHandler`
  entfernt, um zu einer möglichst einfachen Anfangs-Lösung zu gelangen.

src/main/java/de/juplo/kafka/Application.java
src/main/java/de/juplo/kafka/ApplicationProperties.java
src/main/java/de/juplo/kafka/ApplicationRecordHandler.java

index 69a97125726ef3a84ac885d226bc69db2051cc4f..d61009efa4e9ef9d7b5116e63f2636b0d6588936 100644 (file)
@@ -21,7 +21,6 @@ import org.springframework.kafka.support.serializer.JsonSerializer;
 import org.springframework.util.backoff.FixedBackOff;
 
 import java.util.Map;
-import java.util.Optional;
 
 
 @SpringBootApplication
@@ -33,12 +32,10 @@ public class Application
   @Bean
   public ApplicationRecordHandler applicationRecordHandler(
       AdderResults adderResults,
-      KafkaProperties kafkaProperties,
-      ApplicationProperties applicationProperties)
+      KafkaProperties kafkaProperties)
   {
     return new ApplicationRecordHandler(
         adderResults,
-        Optional.ofNullable(applicationProperties.getThrottle()),
         kafkaProperties.getConsumer().getGroupId());
   }
 
index 005460c7a1ad3d5c3d1b702c6d053872bfbf993f..940ed5b81722df72bba776ff2b63a4546d3cabc7 100644 (file)
@@ -7,7 +7,6 @@ import org.springframework.validation.annotation.Validated;
 
 import javax.validation.constraints.NotEmpty;
 import javax.validation.constraints.NotNull;
-import java.time.Duration;
 
 
 @ConfigurationProperties(prefix = "sumup.adder")
@@ -19,5 +18,4 @@ public class ApplicationProperties
   @NotNull
   @NotEmpty
   private String topic;
-  private Duration throttle;
 }
index 207578193005d77640d78b9b30ea08f25153d5fc..f08231414a0d206172129892206fbfa35a8ab626 100644 (file)
@@ -8,10 +8,8 @@ import org.springframework.kafka.support.KafkaHeaders;
 import org.springframework.messaging.handler.annotation.Header;
 import org.springframework.messaging.handler.annotation.Payload;
 
-import java.time.Duration;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Optional;
 
 
 @RequiredArgsConstructor
@@ -22,7 +20,6 @@ import java.util.Optional;
 public class ApplicationRecordHandler
 {
   private final AdderResults results;
-  private final Optional<Duration> throttle;
   private final String id;
 
   private final Map<Integer, AdderBusinessLogic> state = new HashMap<>();
@@ -39,7 +36,6 @@ public class ApplicationRecordHandler
   {
     log.debug("{} - Received {} for {} on {}", id, message, user, partition);
     state.get(partition).addToSum(user, message.getNext());
-    throttle();
   }
 
   @KafkaHandler
@@ -54,23 +50,8 @@ public class ApplicationRecordHandler
     AdderResult result = state.get(partition).calculate(user);
     log.info("{} - New result for {}: {}", id, user, result);
     results.addResults(partition, user, result);
-    throttle();
   }
 
-  private void throttle()
-  {
-    if (throttle.isPresent())
-    {
-      try
-      {
-        Thread.sleep(throttle.get().toMillis());
-      }
-      catch (InterruptedException e)
-      {
-        log.warn("{} - Intrerrupted while throttling: {}", id, e);
-      }
-    }
-  }
 
   protected void addPartition(Integer partition, Map<String, AdderResult> state)
   {