<artifactId>spring-boot-starter-validation</artifactId>
</dependency>
<dependency>
- <groupId>org.springframework.kafka</groupId>
- <artifactId>spring-kafka</artifactId>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.springframework.kafka</groupId>
+ <artifactId>spring-kafka</artifactId>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
package de.juplo.kafka;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.StickyAssignor;
+import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
-import org.springframework.kafka.core.ConsumerFactory;
+import java.util.Properties;
@Configuration
-@EnableConfigurationProperties({ KafkaProperties.class, ApplicationProperties.class })
+@EnableConfigurationProperties(ApplicationProperties.class)
public class ApplicationConfiguration
{
@Bean
public ExampleConsumer exampleConsumer(
- Consumer<String, String> kafkaConsumer,
- KafkaProperties kafkaProperties,
- ApplicationProperties applicationProperties)
+ KafkaConsumer<String, String> kafkaConsumer,
+ ApplicationProperties properties)
{
return
new ExampleConsumer(
kafkaConsumer,
- kafkaProperties.getClientId(),
- applicationProperties.getTopics());
+ properties.getClientId(),
+ properties.getTopics());
}
@Bean
- public Consumer<?, ?> kafkaConsumer(ConsumerFactory<?, ?> factory)
+ public KafkaConsumer<String, String> kafkaConsumer(ApplicationProperties properties)
{
- return factory.createConsumer();
+ Properties props = new Properties();
+ props.put("bootstrap.servers", properties.getBootstrapServer());
+ props.put("client.id", properties.getClientId());
+ props.put("group.id", properties.getGroupId());
+ if (properties.getAutoOffsetReset() != null)
+ {
+ props.put("auto.offset.reset", properties.getAutoOffsetReset().name());
+ }
+ if (properties.autoCommitInterval != null)
+ {
+ props.put("auto.commit.interval", properties.getAutoCommitInterval());
+ }
+ props.put("metadata.maxage.ms", 5000); // 5 Sekunden
+ props.put("partition.assignment.strategy", StickyAssignor.class.getName());
+ props.put("key.deserializer", StringDeserializer.class.getName());
+ props.put("value.deserializer", StringDeserializer.class.getName());
+
+ return new KafkaConsumer<String, String>(props);
}
}
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.validation.annotation.Validated;
+import java.time.Duration;
+
@ConfigurationProperties(prefix = "juplo.consumer")
@Validated
@Setter
public class ApplicationProperties
{
+ @NotNull
+ @NotEmpty
+ private String bootstrapServer;
+ @NotNull
+ @NotEmpty
+ private String clientId;
+ @NotNull
+ @NotEmpty
+ private String groupId;
@NotNull
@NotEmpty
private String[] topics;
+ ApplicationProperties.OffsetReset autoOffsetReset;
+ Duration autoCommitInterval;
+
+ enum OffsetReset { latest, earliest, none}
}
juplo:
consumer:
+ bootstrap-server: :9092
+ client-id: DEV
+ group-id: my-group
topics: test
+ auto-offset-reset: earliest
+ auto-commit-interval: 5s
management:
endpoint:
shutdown:
enabled: true
info:
kafka:
- bootstrap-server: ${spring.kafka.bootstrap-servers}
- client-id: ${spring.kafka.client-id}
- group-id: ${spring.kafka.consumer.group-id}
+ bootstrap-server: ${juplo.consumer.bootstrap-server}
+ client-id: ${juplo.consumer.client-id}
+ group-id: ${juplo.consumer.group-id}
topics: ${juplo.consumer.topics}
- auto-offset-reset: ${spring.kafka.consumer.auto-offset-reset}
-spring:
- kafka:
- bootstrap-servers: :9092
- client-id: DEV
- consumer:
- group-id: my-group
- auto-offset-reset: earliest
- auto-commit-interval: 5s
- key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
- value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
- properties:
- partition.assignment.strategy: org.apache.kafka.clients.consumer.StickyAssignor
- metadata.max.age.ms: 1000
+ auto-offset-reset: ${juplo.consumer.auto-offset-reset}
+ auto-commit-interval: ${juplo.consumer.auto-commit-interval}
logging:
level:
root: INFO