<artifactId>spring-boot-starter-validation</artifactId>
</dependency>
<dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
+ <groupId>org.springframework.kafka</groupId>
+ <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>org.springframework.kafka</groupId>
- <artifactId>spring-kafka</artifactId>
- <scope>test</scope>
- </dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
package de.juplo.kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.clients.producer.Producer;
+import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.annotation.EnableKafka;
+import org.springframework.kafka.core.ProducerFactory;
import java.util.Properties;
@Configuration
-@EnableConfigurationProperties(ApplicationProperties.class)
+@EnableConfigurationProperties({ KafkaProperties.class, ApplicationProperties.class })
+@EnableKafka
public class ApplicationConfiguration
{
@Bean
public RestGateway restGateway(
- ApplicationProperties properties,
- KafkaProducer<String, Integer> kafkaProducer)
+ ApplicationProperties applicationProperties,
+ KafkaProperties kafkaProperties,
+ Producer<String, Integer> kafkaProducer)
{
return
new RestGateway(
- properties.getClientId(),
- properties.getTopic(),
- properties.getPartition(),
+ kafkaProperties.getClientId(),
+ applicationProperties.getTopic(),
+ applicationProperties.getPartition(),
kafkaProducer);
}
@Bean(destroyMethod = "close")
- public KafkaProducer<String, Integer> kafkaProducer(ApplicationProperties properties)
+ public Producer<String, Integer> kafkaProducer(ProducerFactory<String, Integer> factory)
{
- Properties props = new Properties();
- props.put("bootstrap.servers", properties.getBootstrapServer());
- props.put("client.id", properties.getClientId());
- props.put("acks", properties.getAcks());
- props.put("batch.size", properties.getBatchSize());
- props.put("delivery.timeout.ms", 20000); // 20 Sekunden
- props.put("request.timeout.ms", 10000); // 10 Sekunden
- props.put("linger.ms", properties.getLingerMs());
- props.put("compression.type", properties.getCompressionType());
- props.put("key.serializer", StringSerializer.class.getName());
- props.put("value.serializer", IntegerSerializer.class.getName());
-
- return new KafkaProducer<>(props);
+ return factory.createProducer();
}
}
@Setter
public class ApplicationProperties
{
- @NotNull
- @NotEmpty
- private String bootstrapServer;
- @NotNull
- @NotEmpty
- private String clientId;
@NotNull
@NotEmpty
private String topic;
private Integer partition;
- @NotNull
- @NotEmpty
- private String acks;
- @NotNull
- private Integer batchSize;
- @NotNull
- private Integer lingerMs;
- @NotNull
- @NotEmpty
- private String compressionType;
}
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.*;
private final String id;
private final String topic;
private final Integer partition;
- private final KafkaProducer<String, Integer> producer;
+ private final Producer<String, Integer> producer;
private long produced = 0;
sumup:
gateway:
- bootstrap-server: :9092
- client-id: DEV
topic: test
- acks: -1
- batch-size: 16384
- linger-ms: 0
- compression-type: gzip
management:
endpoint:
shutdown:
enabled: true
info:
kafka:
- bootstrap-server: ${producer.bootstrap-server}
- client-id: ${producer.client-id}
- topic: ${producer.topic}
- acks: ${producer.acks}
- batch-size: ${producer.batch-size}
- linger-ms: ${producer.linger-ms}
- compression-type: ${producer.compression-type}
+ bootstrap-servers: ${spring.kafka.bootstrap-servers}
+ client-id: ${spring.kafka.client-id}
+ group-id: ${spring.kafka.consumer.group-id}
+ auto-offset-reset: ${spring.kafka.consumer.auto-offset-reset}
+ auto-commit-interval-ms: ${spring.kafka.consumer.properties.auto.commit.interval.ms}
+ topic: ${sumup.gateway.topic}
+ acks: ${spring.kafka.producer.acks}
+ batch-size: ${spring.kafka.producer.batch-size}
+ linger-ms: ${spring.kafka.producer.properties.linger.ms}
+ compression-type: ${spring.kafka.producer.compression-type}
+spring:
+ kafka:
+ bootstrap-servers: :9092
+ client-id: DEV
+ consumer:
+ auto-offset-reset: earliest
+ group-id: my-group
+ properties:
+ auto.commit.interval.ms: 5000
+ producer:
+ acks: -1
+ batch-size: 16384
+ compression-type: gzip
+ key-serializer: org.apache.kafka.common.serialization.StringSerializer
+ value-serializer: org.springframework.kafka.support.serializer.IntegerSerializer
+ properties:
+ linger.ms: 0
+ delivery.timeout.ms: 20000 # 20 Sekunden
+ request.timeout.ms: 10000 # 10 Sekunden
logging:
level:
root: INFO
- de.juplo: TRACE
+ de.juplo: DEBUG
server:
- port: 8880
+ port: 8881