X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationConfiguration.java;h=62d61a20905dd8c4e31089199f38bc11a5b92332;hb=8f14291a52872141ded7ecead6b1f7f9fbc969e4;hp=46bb66789ce06cda2b5776ffd523c27416becdd4;hpb=5b9f4cd21a87b03cb1c432e9965fb0082ab05dd3;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 46bb667..62d61a2 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -1,43 +1,34 @@ package de.juplo.kafka; import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.serialization.StringDeserializer; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import java.util.Properties; +import org.springframework.kafka.core.ConsumerFactory; @Configuration -@EnableConfigurationProperties({ ApplicationProperties.class }) +@EnableConfigurationProperties({ KafkaProperties.class, ApplicationProperties.class }) public class ApplicationConfiguration { @Bean public SimpleConsumer simpleConsumer( Consumer kafkaConsumer, + KafkaProperties kafkaProperties, ApplicationProperties applicationProperties) { return new SimpleConsumer( - applicationProperties.getClientId(), + kafkaProperties.getClientId(), applicationProperties.getTopic(), kafkaConsumer); } @Bean - public Consumer kafkaConsumer(ApplicationProperties properties) + public Consumer kafkaConsumer(ConsumerFactory factory) { - Properties props = new Properties(); - props.put("bootstrap.servers", properties.getBootstrapServers()); - props.put("group.id", properties.getGroupId()); // ID für die Offset-Commits - props.put("client.id", properties.getClientId()); // Nur zur Wiedererkennung - props.put("auto.offset.reset", properties.getAutoOffsetReset()); - props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.CooperativeStickyAssignor"); - props.put("key.deserializer", StringDeserializer.class.getName()); - props.put("value.deserializer", StringDeserializer.class.getName()); - - return new KafkaConsumer<>(props); + return factory.createConsumer(); } }