<artifactId>spring-consumer</artifactId>
<version>1.0-SNAPSHOT</version>
<name>Spring Consumer</name>
- <description>Super Simple Consumer-Group, that is implemented as a Spring-Boot application</description>
+ <description>Super Simple Consumer-Group, that is implemented as Spring-Boot application and configured by Spring Kafka</description>
<properties>
<java.version>11</java.version>
<optional>true</optional>
</dependency>
<dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
+ <groupId>org.springframework.kafka</groupId>
+ <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
</execution>
</executions>
</plugin>
- <plugin>
- <groupId>pl.project13.maven</groupId>
- <artifactId>git-commit-id-plugin</artifactId>
- </plugin>
<plugin>
<groupId>io.fabric8</groupId>
<artifactId>docker-maven-plugin</artifactId>
package de.juplo.kafka;
import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.serialization.StringDeserializer;
+import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
-import java.util.Properties;
+import org.springframework.kafka.core.ConsumerFactory;
@Configuration
-@EnableConfigurationProperties({ ApplicationProperties.class })
+@EnableConfigurationProperties({ KafkaProperties.class, ApplicationProperties.class })
public class ApplicationConfiguration
{
@Bean
public SimpleConsumer simpleConsumer(
Consumer<String, String> kafkaConsumer,
+ KafkaProperties kafkaProperties,
ApplicationProperties applicationProperties)
{
return
new SimpleConsumer(
- applicationProperties.getClientId(),
+ kafkaProperties.getClientId(),
applicationProperties.getTopic(),
kafkaConsumer);
}
@Bean
- public Consumer<String, String> kafkaConsumer(ApplicationProperties properties)
+ public Consumer<?, ?> kafkaConsumer(ConsumerFactory<?, ?> factory)
{
- Properties props = new Properties();
- props.put("bootstrap.servers", properties.getBootstrapServers());
- props.put("group.id", properties.getGroupId()); // ID für die Offset-Commits
- props.put("client.id", properties.getClientId()); // Nur zur Wiedererkennung
- props.put("auto.offset.reset", properties.getAutoOffsetReset());
- props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.CooperativeStickyAssignor");
- props.put("key.deserializer", StringDeserializer.class.getName());
- props.put("value.deserializer", StringDeserializer.class.getName());
-
- return new KafkaConsumer<>(props);
+ return factory.createConsumer();
}
}
@Setter
public class ApplicationProperties
{
- @NotNull
- @NotEmpty
- private String bootstrapServers;
- @NotNull
- @NotEmpty
- private String groupId;
- @NotNull
- @NotEmpty
- private String clientId;
@NotNull
@NotEmpty
private String topic;
- @NotNull
- @NotEmpty
- private String autoOffsetReset;
}
simple:
consumer:
- bootstrap-servers: ":9092"
- group-id: "my-group"
- client-id: "DEV"
topic: test
- auto-offset-reset: earliest
management:
endpoint:
shutdown:
enabled: true
info:
kafka:
- bootstrap-server: ${simple.consumer.bootstrap-servers}
- client-id: ${simple.consumer.client-id}
- group-id: ${simple.consumer.group-id}
+ bootstrap-server: ${spring.kafka.bootstrap-servers}
+ client-id: ${spring.kafka.client-id}
+ group-id: ${spring.kafka.consumer.group-id}
topic: ${simple.consumer.topic}
- auto-offset-reset: ${simple.consumer.auto-offset-reset}
+ auto-offset-reset: ${spring.kafka.consumer.auto-offset-reset}
+spring:
+ kafka:
+ bootstrap-servers: :9092
+ client-id: DEV
+ consumer:
+ group-id: my-group
+ auto-offset-reset: earliest
+ auto-commit-interval: 5s
+ key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
+ value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
+ properties:
+ partition.assignment.strategy: org.apache.kafka.clients.consumer.StickyAssignor
+ metadata.max.age.ms: 1000
logging:
level:
root: INFO
--- /dev/null
+package de.juplo.kafka;
+
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.web.client.TestRestTemplate;
+import org.springframework.boot.test.web.server.LocalServerPort;
+import org.springframework.kafka.test.context.EmbeddedKafka;
+
+import static de.juplo.kafka.ApplicationIT.TOPIC;
+
+
+@SpringBootTest(
+ webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
+ properties = {
+ "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
+ "simple.consumer.topic=" + TOPIC })
+@EmbeddedKafka(topics = TOPIC)
+public class ApplicationIT
+{
+ public static final String TOPIC = "FOO";
+
+ @LocalServerPort
+ private int port;
+
+ @Autowired
+ private TestRestTemplate restTemplate;
+
+
+
+ @Test
+ public void testApplicationStartup()
+ {
+ restTemplate.getForObject(
+ "http://localhost:" + port + "/actuator/health",
+ String.class
+ )
+ .contains("UP");
+ }
+}