From: Kai Moritz Date: Sat, 9 Apr 2022 09:46:51 +0000 (+0200) Subject: Refaktorisierung für Tests - Bean-Definition in Config-Klasse verschoben X-Git-Tag: deserialization-synchroner-test~12 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=f9890a95d6672e1847e1d9f53a76c95ade877a9b;p=demos%2Fkafka%2Ftraining Refaktorisierung für Tests - Bean-Definition in Config-Klasse verschoben --- diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java index 5226d6b..0069257 100644 --- a/src/main/java/de/juplo/kafka/Application.java +++ b/src/main/java/de/juplo/kafka/Application.java @@ -1,55 +1,12 @@ package de.juplo.kafka; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.serialization.LongDeserializer; -import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.context.annotation.Bean; - -import java.util.Properties; -import java.util.concurrent.Executors; @SpringBootApplication -@EnableConfigurationProperties(ApplicationProperties.class) public class Application { - @Bean - public EndlessConsumer endlessConsumer( - KafkaConsumer kafkaConsumer, - ApplicationProperties properties) - { - EndlessConsumer consumer = - new EndlessConsumer( - Executors.newFixedThreadPool(1), - properties.getClientId(), - properties.getTopic(), - kafkaConsumer); - - consumer.start(); - - return consumer; - } - - @Bean(destroyMethod = "close") - public KafkaConsumer kafkaConsumer(ApplicationProperties properties) - { - Properties props = new Properties(); - - props.put("bootstrap.servers", properties.getBootstrapServer()); - props.put("group.id", properties.getGroupId()); - props.put("client.id", properties.getClientId()); - props.put("auto.offset.reset", properties.getAutoOffsetReset()); - props.put("metadata.max.age.ms", "1000"); - props.put("key.deserializer", StringDeserializer.class.getName()); - props.put("value.deserializer", LongDeserializer.class.getName()); - - return new KafkaConsumer<>(props); - } - - public static void main(String[] args) { SpringApplication.run(Application.class, args); diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java new file mode 100644 index 0000000..6e04453 --- /dev/null +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -0,0 +1,50 @@ +package de.juplo.kafka; + +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.Properties; +import java.util.concurrent.Executors; + + +@Configuration +@EnableConfigurationProperties(ApplicationProperties.class) +public class ApplicationConfiguration +{ + @Bean + public EndlessConsumer endlessConsumer( + KafkaConsumer kafkaConsumer, + ApplicationProperties properties) + { + EndlessConsumer consumer = + new EndlessConsumer( + Executors.newFixedThreadPool(1), + properties.getClientId(), + properties.getTopic(), + kafkaConsumer); + + consumer.start(); + + return consumer; + } + + @Bean(destroyMethod = "close") + public KafkaConsumer kafkaConsumer(ApplicationProperties properties) + { + Properties props = new Properties(); + + props.put("bootstrap.servers", properties.getBootstrapServer()); + props.put("group.id", properties.getGroupId()); + props.put("client.id", properties.getClientId()); + props.put("auto.offset.reset", properties.getAutoOffsetReset()); + props.put("metadata.max.age.ms", "1000"); + props.put("key.deserializer", StringDeserializer.class.getName()); + props.put("value.deserializer", LongDeserializer.class.getName()); + + return new KafkaConsumer<>(props); + } +}