From: Kai Moritz Date: Mon, 18 Apr 2022 11:47:40 +0000 (+0200) Subject: Springify: Merge der Umstellung des Payloads auf JSON X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=4aef11c37556c827397af6c25f5c129cd0147a68;p=demos%2Fkafka%2Ftraining Springify: Merge der Umstellung des Payloads auf JSON --- 4aef11c37556c827397af6c25f5c129cd0147a68 diff --cc src/main/java/de/juplo/kafka/ApplicationConfiguration.java index b67f795,431041c..0f02ab1 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@@ -1,11 -1,17 +1,12 @@@ package de.juplo.kafka; import org.apache.kafka.clients.consumer.ConsumerRecord; + import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.serialization.LongDeserializer; -import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; - import org.springframework.kafka.listener.CommonContainerStoppingErrorHandler; -import org.springframework.kafka.support.serializer.JsonDeserializer; ++import org.springframework.kafka.core.ConsumerFactory; -import java.util.Properties; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.function.Consumer; @@@ -23,8 -29,42 +24,14 @@@ public class ApplicationConfiguratio } @Bean - public EndlessConsumer endlessConsumer( - KafkaConsumer kafkaConsumer, - ExecutorService executor, - Consumer> handler, - ApplicationProperties properties) + public ApplicationErrorHandler errorHandler() { - return - new EndlessConsumer<>( - executor, - properties.getClientId(), - properties.getTopic(), - kafkaConsumer, - handler); - } - - @Bean - public ExecutorService executor() - { - return Executors.newSingleThreadExecutor(); + return new ApplicationErrorHandler(); } + + @Bean(destroyMethod = "close") - public KafkaConsumer kafkaConsumer(ApplicationProperties properties) ++ public org.apache.kafka.clients.consumer.Consumer kafkaConsumer(ConsumerFactory factory) + { - Properties props = new Properties(); - - props.put("bootstrap.servers", properties.getBootstrapServer()); - props.put("group.id", properties.getGroupId()); - props.put("client.id", properties.getClientId()); - props.put("auto.offset.reset", properties.getAutoOffsetReset()); - props.put("metadata.max.age.ms", "1000"); - props.put("key.deserializer", StringDeserializer.class.getName()); - props.put("value.deserializer", JsonDeserializer.class.getName()); - props.put(JsonDeserializer.TYPE_MAPPINGS, "message:" + ClientMessage.class.getName()); - props.put(JsonDeserializer.TRUSTED_PACKAGES, "de.juplo.kafka"); - - return new KafkaConsumer<>(props); ++ return factory.createConsumer(); + } } diff --cc src/main/resources/application.yml index 1cb6212,9f3cb81..081fe1a --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@@ -24,14 -24,6 +24,17 @@@ info group-id: ${consumer.group-id} topic: ${consumer.topic} auto-offset-reset: ${consumer.auto-offset-reset} +spring: + kafka: + consumer: + bootstrap-servers: ${consumer.bootstrap-server} + client-id: ${consumer.client-id} + auto-offset-reset: ${consumer.auto-offset-reset} + group-id: ${consumer.group-id} - value-deserializer: org.apache.kafka.common.serialization.LongDeserializer ++ value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer ++ properties: ++ spring.json.type.mapping: "message:de.juplo.kafka.ClientMessage" ++ spring.json.trusted.packages: "de.juplo.kafka" logging: level: root: INFO diff --cc src/test/java/de/juplo/kafka/ApplicationTests.java index 5a0f43d,6c25bcd..ee8e8c4 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@@ -16,7 -15,7 +16,8 @@@ import org.springframework.boot.test.co import org.springframework.boot.test.context.TestConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Import; +import org.springframework.context.annotation.Primary; + import org.springframework.kafka.support.serializer.JsonSerializer; import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.test.context.TestPropertySource; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; @@@ -24,9 -23,10 +25,9 @@@ import java.time.Duration; import java.util.*; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; import java.util.function.BiConsumer; + import java.util.function.BiFunction; import java.util.function.Consumer; - import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; @@@ -66,13 -63,13 +69,13 @@@ class ApplicationTest @Autowired ApplicationProperties properties; @Autowired - ExecutorService executor; + EndlessConsumer endlessConsumer; + @Autowired + RecordHandler recordHandler; - Consumer> testHandler; - EndlessConsumer endlessConsumer; Map oldOffsets; Map newOffsets; - Set> receivedRecords; + Set> receivedRecords; /** Tests methods */ @@@ -262,36 -286,15 +274,36 @@@ } } - public static class RecordHandler implements Consumer> ++ public static class RecordHandler implements Consumer> + { - Consumer> captureOffsets; - Consumer> testHandler; ++ Consumer> captureOffsets; ++ Consumer> testHandler; + + + @Override - public void accept(ConsumerRecord record) ++ public void accept(ConsumerRecord record) + { + captureOffsets + .andThen(testHandler) + .accept(record); + } + } @TestConfiguration @Import(ApplicationConfiguration.class) public static class Configuration { + @Primary + @Bean - public Consumer> testHandler() ++ public Consumer> testHandler() + { + return new RecordHandler(); + } + @Bean - Serializer serializer() + Serializer serializer() { - return new LongSerializer(); + return new JsonSerializer<>(); } @Bean