private final RecordHandler<String, Long> recordHandler;
private final Thread workerThread;
private final Clock clock;
+ private final Duration pollRequestTimeout;
private final Duration maxPollInterval;
private final Duration minTimeForNextRecord;
private final BackOff backOffStrategy;
Consumer<String, Long> consumer,
RecordHandler<String, Long> recordHandler,
Clock clock,
+ Duration pollRequestTimeout,
Duration maxPollInterval,
Duration maxTimePerRecord,
BackOff backOffStrategy,
this.consumer = consumer;
this.recordHandler = recordHandler;
this.clock = clock;
+ this.pollRequestTimeout = pollRequestTimeout;
this.maxPollInterval = maxPollInterval;
this.minTimeForNextRecord = maxTimePerRecord.multipliedBy(2);
this.backOffStrategy = backOffStrategy;
try
{
ConsumerRecords<String, Long> records =
- consumer.poll(Duration.ofSeconds(1));
+ consumer.poll(pollRequestTimeout);
log.info("{} - Received {} messages", id, records.count());
},
properties = {
"juplo.bootstrap-server=${spring.embedded.kafka.brokers}",
+ "juplo.consumer.poll-request-timeout=" + POLL_REQUEST_TIMEOUT_MS + "ms",
"juplo.consumer.max-poll-interval=" + MAX_POLL_INTERVALL_MS + "ms",
"juplo.consumer.max-time-per-record=" + ERROR_TIMEOUT_MS + "ms",
"juplo.consumer.num-retries=" + NUM_RETRIES,
static final String TOPIC = "ExampleConsumerTest_TEST";
static final int NUM_PARTITIONS = 10;
static final int NUM_RETRIES = 6;
+ static final int POLL_REQUEST_TIMEOUT_MS = 1000;
static final int MAX_POLL_INTERVALL_MS = 5000;
static final int ERROR_TIMEOUT_MS = 1000;
consumer,
mockRecordHandler,
clock,
+ properties.getConsumerProperties().getPollRequestTimeout(),
properties.getConsumerProperties().getMaxPollInterval(),
properties.getConsumerProperties().getMaxTimePerRecord(),
new FixedBackOff(0l, properties.getConsumerProperties().getNumRetries()),