package de.juplo.kafka;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
+import java.util.Properties;
+
@Configuration
@EnableConfigurationProperties(ApplicationProperties.class)
public class ApplicationConfiguration
{
@Bean
- public ExampleConsumer exampleConsumer(ApplicationProperties properties)
+ public ExampleConsumer exampleConsumer(
+ Consumer<String, String> kafkaConsumer,
+ ApplicationProperties properties)
{
return new ExampleConsumer(
- properties.getBroker(),
+ kafkaConsumer,
properties.getTopic(),
- properties.getGroupId(),
properties.getClientId());
}
+
+ @Bean(destroyMethod = "")
+ public Consumer<String, String> kafkaConsumer(ApplicationProperties properties)
+ {
+ Properties props = new Properties();
+
+ props.put("bootstrap.servers", properties.getBroker());
+ props.put("group.id", properties.getGroupId()); // ID für die Offset-Commits
+ props.put("client.id", properties.getClientId()); // Nur zur Wiedererkennung
+ props.put("key.deserializer", StringDeserializer.class.getName());
+ props.put("value.deserializer", StringDeserializer.class.getName());
+
+ return new KafkaConsumer<>(props);
+ }
}
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Arrays;
-import java.util.Properties;
@Slf4j
private long consumed = 0;
public ExampleConsumer(
- String broker,
+ Consumer<String, String> consumer,
String topic,
- String groupId,
String clientId)
{
- Properties props = new Properties();
- props.put("bootstrap.servers", broker);
- props.put("group.id", groupId); // ID für die Offset-Commits
- props.put("client.id", clientId); // Nur zur Wiedererkennung
- props.put("key.deserializer", StringDeserializer.class.getName());
- props.put("value.deserializer", StringDeserializer.class.getName());
-
+ this.consumer = consumer;
this.id = clientId;
this.topic = topic;
- consumer = new KafkaConsumer<>(props);
this.worker = new Thread(this, "ConsumerRunner-" + id);
log.info("{} - Starting worker-thread", id);