sumup.adder.bootstrap-server: kafka:9092
sumup.adder.client-id: adder-1
sumup.adder.commit-interval: 3s
+ sumup.adder.throttle: 3ms
spring.data.mongodb.uri: mongodb://juplo:training@mongo:27017
spring.data.mongodb.database: juplo
logging.level.org.apache.kafka.clients.consumer: DEBUG
sumup.adder.bootstrap-server: kafka:9092
sumup.adder.client-id: adder-2
sumup.adder.commit-interval: 3s
+ sumup.adder.throttle: 3ms
spring.data.mongodb.uri: mongodb://juplo:training@mongo:27017
spring.data.mongodb.database: juplo
logging.level.org.apache.kafka.clients.consumer: DEBUG
while [[ true ]];
do
echo 666 | http -v gateway:8080/peter;
+ sleep 1;
done
"
klaus:
while [[ true ]];
do
echo 666 | http -v gateway:8080/klaus;
+ sleep 1;
done
"
import org.springframework.context.annotation.Configuration;
import java.time.Clock;
+import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ApplicationConfiguration
{
@Bean
- public ApplicationRecordHandler recordHandler(AdderResults adderResults)
+ public ApplicationRecordHandler recordHandler(
+ AdderResults adderResults,
+ ApplicationProperties properties)
{
- return new ApplicationRecordHandler(adderResults);
+ return new ApplicationRecordHandler(
+ adderResults,
+ Optional.ofNullable(properties.getThrottle()));
}
@Bean
private String autoOffsetReset;
@NotNull
private Duration commitInterval;
+ private Duration throttle;
}
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
+import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
+import java.util.Optional;
@RequiredArgsConstructor
public class ApplicationRecordHandler implements RecordHandler<String, String>
{
private final AdderResults results;
+ private final Optional<Duration> throttle;
private final Map<Integer, AdderBusinessLogic> state = new HashMap<>();
AdderResult result = state.get(partition).calculate(user);
log.info("New result for {}: {}", user, result);
results.addResults(partition, user, result);
- return;
+ }
+ else
+ {
+ state.get(partition).addToSum(user, Integer.parseInt(message));
}
- state.get(partition).addToSum(user, Integer.parseInt(message));
+ if (throttle.isPresent())
+ {
+ try
+ {
+ Thread.sleep(throttle.get().toMillis());
+ }
+ catch (InterruptedException e)
+ {
+ log.warn("Intrerrupted while throttling: {}", e);
+ }
+ }
}
protected void addPartition(Integer partition, Map<String, AdderResult> state)