<artifactId>spring-boot-starter-validation</artifactId>
</dependency>
<dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
+ <groupId>org.springframework.kafka</groupId>
+ <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>org.springframework.kafka</groupId>
- <artifactId>spring-kafka</artifactId>
- <scope>test</scope>
- </dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
package de.juplo.kafka;
import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.consumer.StickyAssignor;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
-
-import java.util.Properties;
+import org.springframework.kafka.core.ConsumerFactory;
@Configuration
-@EnableConfigurationProperties(ApplicationProperties.class)
public class ApplicationConfiguration
{
@Bean
public ExampleConsumer exampleConsumer(
Consumer<String, String> kafkaConsumer,
- ApplicationProperties properties,
+ @Value("${spring.kafka.client-id}") String clientId,
+ @Value("${juplo.consumer.topic}") String topic,
ConfigurableApplicationContext applicationContext)
{
return
new ExampleConsumer(
- properties.getClientId(),
- properties.getConsumerProperties().getTopic(),
+ clientId,
+ topic,
kafkaConsumer,
() -> applicationContext.close());
}
@Bean(destroyMethod = "")
- public KafkaConsumer<String, String> kafkaConsumer(ApplicationProperties properties)
+ public Consumer<?, ?> kafkaConsumer(ConsumerFactory<?, ?> consumerFactory)
{
- Properties props = new Properties();
- props.put("bootstrap.servers", properties.getBootstrapServer());
- props.put("client.id", properties.getClientId());
- props.put("group.id", properties.getConsumerProperties().getGroupId());
- if (properties.getConsumerProperties().getAutoOffsetReset() != null)
- {
- props.put("auto.offset.reset", properties.getConsumerProperties().getAutoOffsetReset().name());
- }
- if (properties.getConsumerProperties().getAutoCommitInterval() != null)
- {
- props.put("auto.commit.interval", properties.getConsumerProperties().getAutoCommitInterval());
- }
- props.put("metadata.maxage.ms", 5000); // 5 Sekunden
- props.put("partition.assignment.strategy", StickyAssignor.class.getName());
- props.put("key.deserializer", StringDeserializer.class.getName());
- props.put("value.deserializer", StringDeserializer.class.getName());
-
- return new KafkaConsumer<>(props);
+ return consumerFactory.createConsumer();
}
}
+++ /dev/null
-package de.juplo.kafka;
-
-import jakarta.validation.constraints.NotEmpty;
-import jakarta.validation.constraints.NotNull;
-import lombok.Getter;
-import lombok.Setter;
-import org.springframework.boot.context.properties.ConfigurationProperties;
-import org.springframework.validation.annotation.Validated;
-
-import java.time.Duration;
-
-
-@ConfigurationProperties(prefix = "juplo")
-@Validated
-@Getter
-@Setter
-public class ApplicationProperties
-{
- @NotNull
- @NotEmpty
- private String bootstrapServer;
- @NotNull
- @NotEmpty
- private String clientId;
-
- @NotNull
- private ConsumerProperties consumer;
-
-
- public ConsumerProperties getConsumerProperties()
- {
- return consumer;
- }
-
-
- @Validated
- @Getter
- @Setter
- static class ConsumerProperties
- {
- @NotNull
- @NotEmpty
- private String groupId;
- @NotNull
- @NotEmpty
- private String topic;
- private OffsetReset autoOffsetReset;
- private Duration autoCommitInterval;
-
- enum OffsetReset { latest, earliest, none }
- }
-}