Springify: Merge der Umstellung des Payloads auf JSON
authorKai Moritz <kai@juplo.de>
Mon, 18 Apr 2022 11:47:40 +0000 (13:47 +0200)
committerKai Moritz <kai@juplo.de>
Fri, 22 Apr 2022 14:32:41 +0000 (16:32 +0200)
1  2 
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/resources/application.yml
src/test/java/de/juplo/kafka/ApplicationTests.java

@@@ -1,11 -1,17 +1,12 @@@
  package de.juplo.kafka;
  
  import org.apache.kafka.clients.consumer.ConsumerRecord;
 -import org.apache.kafka.common.serialization.LongDeserializer;
 -import org.apache.kafka.common.serialization.StringDeserializer;
+ import org.apache.kafka.clients.consumer.KafkaConsumer;
  import org.springframework.boot.context.properties.EnableConfigurationProperties;
  import org.springframework.context.annotation.Bean;
  import org.springframework.context.annotation.Configuration;
- import org.springframework.kafka.listener.CommonContainerStoppingErrorHandler;
 -import org.springframework.kafka.support.serializer.JsonDeserializer;
++import org.springframework.kafka.core.ConsumerFactory;
  
 -import java.util.Properties;
 -import java.util.concurrent.ExecutorService;
 -import java.util.concurrent.Executors;
  import java.util.function.Consumer;
  
  
@@@ -23,8 -29,42 +24,14 @@@ public class ApplicationConfiguratio
    }
  
    @Bean
 -  public EndlessConsumer<String, ClientMessage> endlessConsumer(
 -      KafkaConsumer<String, ClientMessage> kafkaConsumer,
 -      ExecutorService executor,
 -      Consumer<ConsumerRecord<String, ClientMessage>> handler,
 -      ApplicationProperties properties)
 +  public ApplicationErrorHandler errorHandler()
    {
 -    return
 -        new EndlessConsumer<>(
 -            executor,
 -            properties.getClientId(),
 -            properties.getTopic(),
 -            kafkaConsumer,
 -            handler);
 -  }
 -
 -  @Bean
 -  public ExecutorService executor()
 -  {
 -    return Executors.newSingleThreadExecutor();
 +    return new ApplicationErrorHandler();
    }
 -  public KafkaConsumer<String, ClientMessage> kafkaConsumer(ApplicationProperties properties)
+   @Bean(destroyMethod = "close")
 -    Properties props = new Properties();
 -
 -    props.put("bootstrap.servers", properties.getBootstrapServer());
 -    props.put("group.id", properties.getGroupId());
 -    props.put("client.id", properties.getClientId());
 -    props.put("auto.offset.reset", properties.getAutoOffsetReset());
 -    props.put("metadata.max.age.ms", "1000");
 -    props.put("key.deserializer", StringDeserializer.class.getName());
 -    props.put("value.deserializer", JsonDeserializer.class.getName());
 -    props.put(JsonDeserializer.TYPE_MAPPINGS, "message:" + ClientMessage.class.getName());
 -    props.put(JsonDeserializer.TRUSTED_PACKAGES, "de.juplo.kafka");
 -
 -    return new KafkaConsumer<>(props);
++  public org.apache.kafka.clients.consumer.Consumer<String, ClientMessage> kafkaConsumer(ConsumerFactory<String, ClientMessage> factory)
+   {
++    return factory.createConsumer();
+   }
  }
@@@ -24,14 -24,6 +24,17 @@@ info
      group-id: ${consumer.group-id}
      topic: ${consumer.topic}
      auto-offset-reset: ${consumer.auto-offset-reset}
-       value-deserializer: org.apache.kafka.common.serialization.LongDeserializer
 +spring:
 +  kafka:
 +    consumer:
 +      bootstrap-servers: ${consumer.bootstrap-server}
 +      client-id: ${consumer.client-id}
 +      auto-offset-reset: ${consumer.auto-offset-reset}
 +      group-id: ${consumer.group-id}
++      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
++      properties:
++        spring.json.type.mapping: "message:de.juplo.kafka.ClientMessage"
++        spring.json.trusted.packages: "de.juplo.kafka"
  logging:
    level:
      root: INFO
@@@ -16,7 -15,7 +16,8 @@@ import org.springframework.boot.test.co
  import org.springframework.boot.test.context.TestConfiguration;
  import org.springframework.context.annotation.Bean;
  import org.springframework.context.annotation.Import;
 +import org.springframework.context.annotation.Primary;
+ import org.springframework.kafka.support.serializer.JsonSerializer;
  import org.springframework.kafka.test.context.EmbeddedKafka;
  import org.springframework.test.context.TestPropertySource;
  import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
  import java.time.Duration;
  import java.util.*;
  import java.util.concurrent.ExecutionException;
 -import java.util.concurrent.ExecutorService;
  import java.util.function.BiConsumer;
+ import java.util.function.BiFunction;
  import java.util.function.Consumer;
- import java.util.function.Function;
  import java.util.stream.Collectors;
  import java.util.stream.IntStream;
  
@@@ -66,13 -63,13 +69,13 @@@ class ApplicationTest
        @Autowired
        ApplicationProperties properties;
        @Autowired
 -      ExecutorService executor;
 +      EndlessConsumer endlessConsumer;
 +      @Autowired
 +      RecordHandler recordHandler;
  
 -      Consumer<ConsumerRecord<String, ClientMessage>> testHandler;
 -      EndlessConsumer<String, ClientMessage> endlessConsumer;
        Map<TopicPartition, Long> oldOffsets;
        Map<TopicPartition, Long> newOffsets;
-       Set<ConsumerRecord<String, Long>> receivedRecords;
+       Set<ConsumerRecord<String, ClientMessage>> receivedRecords;
  
  
        /** Tests methods */
                }
        }
  
-       public static class RecordHandler implements Consumer<ConsumerRecord<String, Long>>
++      public static class RecordHandler implements Consumer<ConsumerRecord<String, ClientMessage>>
 +      {
-               Consumer<ConsumerRecord<String, Long>> captureOffsets;
-               Consumer<ConsumerRecord<String, Long>> testHandler;
++              Consumer<ConsumerRecord<String, ClientMessage>> captureOffsets;
++              Consumer<ConsumerRecord<String, ClientMessage>> testHandler;
 +
 +
 +              @Override
-               public void accept(ConsumerRecord<String, Long> record)
++              public void accept(ConsumerRecord<String, ClientMessage> record)
 +              {
 +                      captureOffsets
 +                                      .andThen(testHandler)
 +                                      .accept(record);
 +              }
 +      }
  
        @TestConfiguration
        @Import(ApplicationConfiguration.class)
        public static class Configuration
        {
-               public Consumer<ConsumerRecord<String, Long>> testHandler()
 +              @Primary
 +              @Bean
++              public Consumer<ConsumerRecord<String, ClientMessage>> testHandler()
 +              {
 +                      return new RecordHandler();
 +              }
 +
                @Bean
-               Serializer<Long> serializer()
+               Serializer<ClientMessage> serializer()
                {
-                       return new LongSerializer();
+                       return new JsonSerializer<>();
                }
  
                @Bean