properties.getConsumerProperties().getPollRequestTimeout(),
properties.getConsumerProperties().getMaxPollInterval(),
properties.getConsumerProperties().getMaxTimePerRecord(),
+ properties.getConsumerProperties().getMinSlackPerPollInterval(),
backOffStrategy,
() -> applicationContext.close());
}
Duration pollRequestTimeout,
Duration maxPollInterval,
Duration maxTimePerRecord,
+ Duration minSlackPerPollInterval,
BackOff backOffStrategy,
Runnable closeCallback)
{
this.clock = clock;
this.pollRequestTimeout = pollRequestTimeout;
this.maxPollInterval = maxPollInterval;
- this.minTimeForNextRecord = maxTimePerRecord.multipliedBy(2);
+ this.minTimeForNextRecord = maxTimePerRecord.plus(minSlackPerPollInterval);
this.backOffStrategy = backOffStrategy;
int numPartitions = consumer.partitionsFor(topic).size();
"juplo.consumer.poll-request-timeout=" + POLL_REQUEST_TIMEOUT_MS + "ms",
"juplo.consumer.max-poll-interval=" + MAX_POLL_INTERVALL_MS + "ms",
"juplo.consumer.max-time-per-record=" + MAX_TIME_PER_RECORD_MS + "ms",
+ "juplo.consumer.min-slack-per-poll-interval=" + MIN_SLACK_PER_POLL_INTERVAL_MS + "ms",
"juplo.consumer.num-retries=" + NUM_RETRIES,
"spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.ByteArraySerializer",
"logging.level.de.juplo.kafka=TRACE",
static final int POLL_REQUEST_TIMEOUT_MS = 50;
static final int MAX_POLL_INTERVALL_MS = 500;
static final int MAX_TIME_PER_RECORD_MS = 100;
+ static final int MIN_SLACK_PER_POLL_INTERVAL_MS = 100;
@Autowired
KafkaTemplate<String, byte[]> kafkaTemplate;
properties.getConsumerProperties().getPollRequestTimeout(),
properties.getConsumerProperties().getMaxPollInterval(),
properties.getConsumerProperties().getMaxTimePerRecord(),
+ properties.getConsumerProperties().getMinSlackPerPollInterval(),
backOff,
() -> isTerminatedExceptionally.set(true));
}