Springify: Der Kafka-`Consumer` wird über die Spring-Factory erzeugt
authorKai Moritz <kai@juplo.de>
Fri, 22 Apr 2022 09:24:55 +0000 (11:24 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 10 Sep 2022 07:49:04 +0000 (09:49 +0200)
* Per `git cherry-pick` aus `springified-consumer--config' übernommen.
* Conflicts:
** src/main/java/de/juplo/kafka/ApplicationConfiguration.java
** src/test/java/de/juplo/kafka/ApplicationTests.java
* Damit Spring Kafka den Consumer instanziieren kann, musste insbesondere
  noch der Teil der Konfiguration, der fix ist, aus der Konfig-Klasse
  `ApplicationConfiguration` in die YAML-Datei `application.yml` verschoben
  werden:
** Die Auswahl des `StickyAssignor` als Partition-Assignment-Strategy
** Die Konfiguration der Deserialisierer

src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/resources/application.yml
src/test/java/de/juplo/kafka/GenericApplicationTests.java

index 523707f..bae5d51 100644 (file)
@@ -1,15 +1,14 @@
 package de.juplo.kafka;
 
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
-import org.springframework.kafka.support.serializer.JsonDeserializer;
 
 import java.util.Optional;
-import java.util.Properties;
+import org.springframework.kafka.core.ConsumerFactory;
+
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
@@ -53,7 +52,7 @@ public class ApplicationConfiguration
 
   @Bean
   public EndlessConsumer<String, Message> endlessConsumer(
-      KafkaConsumer<String, Message> kafkaConsumer,
+      Consumer<String, Message> kafkaConsumer,
       ExecutorService executor,
       ApplicationRebalanceListener rebalanceListener,
       ApplicationRecordHandler recordHandler,
@@ -77,24 +76,8 @@ public class ApplicationConfiguration
   }
 
   @Bean(destroyMethod = "close")
-  public KafkaConsumer<String, Message> kafkaConsumer(KafkaProperties kafkaProperties)
+  public Consumer<String, Message> kafkaConsumer(ConsumerFactory<String, Message> factory)
   {
-    Properties props = new Properties();
-
-    props.put("bootstrap.servers", kafkaProperties.getBootstrapServers());
-    props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.StickyAssignor");
-    props.put("group.id", kafkaProperties.getConsumer().getGroupId());
-    props.put("client.id", kafkaProperties.getClientId());
-    props.put("auto.offset.reset", kafkaProperties.getConsumer().getAutoOffsetReset());
-    props.put("auto.commit.interval.ms", (int)kafkaProperties.getConsumer().getAutoCommitInterval().toMillis());
-    props.put("metadata.max.age.ms", kafkaProperties.getConsumer().getProperties().get("metadata.max.age.ms"));
-    props.put("key.deserializer", StringDeserializer.class.getName());
-    props.put("value.deserializer", JsonDeserializer.class.getName());
-    props.put(JsonDeserializer.TRUSTED_PACKAGES, "de.juplo.kafka");
-    props.put(JsonDeserializer.TYPE_MAPPINGS,
-      Message.Type.ADD + ":" + MessageAddNumber.class.getName() + "," +
-      Message.Type.CALC + ":" + MessageCalculateSum.class.getName());
-
-    return new KafkaConsumer<>(props);
+    return factory.createConsumer();
   }
 }
index a899340..92f3a6b 100644 (file)
@@ -33,8 +33,14 @@ spring:
       group-id: my-group
       auto-offset-reset: earliest
       auto-commit-interval: 5s
+      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
+      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
       properties:
+        partition.assignment.strategy: org.apache.kafka.clients.consumer.StickyAssignor
         metadata.max.age.ms: 1000
+        spring.json.type.mapping: >
+          ADD:de.juplo.kafka.MessageAddNumber,
+          CALC:de.juplo.kafka.MessageCalculateSum
 logging:
   level:
     root: INFO
index 869b5d9..21c3f7f 100644 (file)
@@ -14,6 +14,7 @@ import org.apache.kafka.common.utils.Bytes;
 import org.junit.jupiter.api.*;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
+import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
 import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
 import org.springframework.boot.autoconfigure.mongo.MongoProperties;
 import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo;
@@ -38,7 +39,11 @@ import static org.assertj.core.api.Assertions.*;
 import static org.awaitility.Awaitility.*;
 
 
-@SpringJUnitConfig(initializers = ConfigDataApplicationContextInitializer.class)
+@SpringJUnitConfig(
+  initializers = ConfigDataApplicationContextInitializer.class,
+  classes = {
+      KafkaAutoConfiguration.class,
+      ApplicationTests.Configuration.class })
 @TestPropertySource(
                properties = {
                                "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
@@ -56,13 +61,13 @@ abstract class GenericApplicationTests<K, V>
 
 
        @Autowired
-       KafkaConsumer<K, V> kafkaConsumer;
+       org.apache.kafka.clients.consumer.Consumer<K, V> kafkaConsumer;
        @Autowired
        Consumer<ConsumerRecord<K, V>> consumer;
        @Autowired
        ApplicationProperties applicationProperties;
-  @Autowired
-  KafkaProperties kafkaProperties;
+       @Autowired
+       KafkaProperties kafkaProperties;
        @Autowired
        ExecutorService executor;
        @Autowired