import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
+import org.springframework.util.backoff.BackOff;
+import org.springframework.util.backoff.FixedBackOff;
+import java.time.Clock;
import java.util.Properties;
Consumer<String, Long> kafkaConsumer,
RecordHandler<String, Long> recordHandler,
ApplicationProperties properties,
+ Clock clock,
+ BackOff backOffStrategy,
ConfigurableApplicationContext applicationContext)
{
return
properties.getConsumerProperties().getTopic(),
kafkaConsumer,
recordHandler,
+ clock,
+ properties.getConsumerProperties().getMaxPollInterval(),
+ properties.getConsumerProperties().getMaxTimePerRecord(),
+ backOffStrategy,
() -> applicationContext.close());
}
@Bean
- public RecordHandler<String, Long> recordHandler(ApplicationProperties properties)
+ public RecordHandler<String, Long> recordHandler()
{
return (topic, partition, offset, key, value) -> log.info("No-Ops Handler called for {}={}", key, value);
}
+ @Bean
+ public BackOff backOffStrategy(ApplicationProperties properties)
+ {
+ return new FixedBackOff(0l, properties.getConsumerProperties().getNumRetries());
+ }
+
+ @Bean
+ public Clock clock()
+ {
+ return Clock.systemDefaultZone();
+ }
+
@Bean(destroyMethod = "")
public KafkaConsumer<String, Long> kafkaConsumer(ApplicationProperties properties)
{
{
props.put("auto.offset.reset", properties.getConsumerProperties().getAutoOffsetReset().name());
}
- if (properties.getConsumerProperties().getAutoCommitInterval() != null)
- {
- props.put("auto.commit.interval", properties.getConsumerProperties().getAutoCommitInterval());
- }
+ props.put("auto.commit.interval", properties.getConsumerProperties().getAutoCommitInterval());
props.put("metadata.maxage.ms", 5000); // 5 Sekunden
+ props.put("max.poll.interval.ms", (int) properties.getConsumer().getMaxPollInterval().toMillis());
+ props.put("max.poll.interval.records", properties.getConsumer().getMaxPollRecords());
props.put("partition.assignment.strategy", StickyAssignor.class.getName());
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", LongDeserializer.class.getName());
@NotEmpty
private String topic;
private OffsetReset autoOffsetReset;
+ @NotNull
private Duration autoCommitInterval;
+ @NotNull
+ private Duration maxPollInterval;
+ @NotNull
+ private int maxPollRecords;
+ @NotNull
+ private Duration maxTimePerRecord;
+ @NotNull
+ private int numRetries;
enum OffsetReset { latest, earliest, none }
}
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.errors.RecordDeserializationException;
import org.apache.kafka.common.errors.WakeupException;
+import org.springframework.util.backoff.BackOff;
+import java.time.Clock;
import java.time.Duration;
import java.util.Arrays;
String topic,
Consumer<String, Long> consumer,
RecordHandler<String, Long> recordHandler,
+ Clock clock,
+ Duration maxPollInterval,
+ Duration maxTimePerRecord,
+ BackOff backOffStrategy,
Runnable closeCallback)
{
this.id = clientId;
Integer partition,
Long offset,
String key,
- Long value)
+ Long value) throws RetriableErrorException, NonRetriableErrorException
{
consumed++;
log.info("{} - {}: {}/{} - {}={}", id, offset, topic, partition, key, value);
--- /dev/null
+package de.juplo.kafka;
+
+public class NonRetriableErrorException extends Exception
+{
+ public NonRetriableErrorException(String message)
+ {
+ super(message);
+ }
+}
Integer partition,
Long offset,
K key,
- V value);
+ V value) throws RetriableErrorException, NonRetriableErrorException;
}
--- /dev/null
+package de.juplo.kafka;
+
+public class RetriableErrorException extends Exception
+{
+ public RetriableErrorException(String message)
+ {
+ super(message);
+ }
+}
topic: test
auto-offset-reset: earliest
auto-commit-interval: 5s
+ max-poll-interval: 5m
+ max-poll-records: 500
+ max-time-per-record: 30s
+ num-retries: 10
management:
endpoint:
shutdown:
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.test.context.EmbeddedKafka;
+import org.springframework.util.backoff.FixedBackOff;
+import java.time.Clock;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
@BeforeEach
- void createExampleConsumer(@Autowired ApplicationProperties properties)
+ void createExampleConsumer(
+ @Autowired ApplicationProperties properties,
+ @Autowired Clock clock)
{
ApplicationConfiguration configuration = new ApplicationConfiguration();
Consumer<String, Long> consumer = configuration.kafkaConsumer(properties);
TOPIC,
consumer,
mockRecordHandler,
+ clock,
+ properties.getConsumerProperties().getMaxPollInterval(),
+ properties.getConsumerProperties().getMaxTimePerRecord(),
+ new FixedBackOff(0l, properties.getConsumerProperties().getNumRetries()),
() -> isTerminatedExceptionally.set(true));
}
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBroker);
return AdminClient.create(properties);
}
+
+ @Bean
+ Clock clock()
+ {
+ return Clock.systemDefaultZone();
+ }
}
}