Verbesserungen aus 'deserialization' nach 'stored-offsets' gemerged
authorKai Moritz <kai@juplo.de>
Fri, 12 Aug 2022 15:40:11 +0000 (17:40 +0200)
committerKai Moritz <kai@juplo.de>
Fri, 12 Aug 2022 15:40:11 +0000 (17:40 +0200)
1  2 
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/resources/application.yml
src/test/java/de/juplo/kafka/ApplicationTests.java

@@@ -71,11 -55,10 +71,12 @@@ public class ApplicationConfiguratio
      Properties props = new Properties();
  
      props.put("bootstrap.servers", properties.getBootstrapServer());
 +    props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.CooperativeStickyAssignor");
      props.put("group.id", properties.getGroupId());
      props.put("client.id", properties.getClientId());
 +    props.put("enable.auto.commit", false);
      props.put("auto.offset.reset", properties.getAutoOffsetReset());
+     props.put("auto.commit.interval.ms", (int)properties.getCommitInterval().toMillis());
      props.put("metadata.max.age.ms", "1000");
      props.put("key.deserializer", StringDeserializer.class.getName());
      props.put("value.deserializer", LongDeserializer.class.getName());
Simple merge
@@@ -26,7 -24,7 +26,6 @@@ import java.util.*
  import java.util.concurrent.ExecutionException;
  import java.util.concurrent.ExecutorService;
  import java.util.function.BiConsumer;
- import java.util.function.Function;
 -import java.util.function.Consumer;
  import java.util.stream.Collectors;
  import java.util.stream.IntStream;
  
@@@ -63,8 -58,8 +62,10 @@@ class ApplicationTest
        @Autowired
        KafkaConsumer<String, Long> kafkaConsumer;
        @Autowired
+       KafkaConsumer<Bytes, Bytes> offsetConsumer;
+       @Autowired
 +      PartitionStatisticsRepository partitionStatisticsRepository;
 +      @Autowired
        ApplicationProperties properties;
        @Autowired
        ExecutorService executor;
  
        /** Helper methods for setting up and running the tests */
  
 -              offsetConsumer.seekToEnd(partitions());
+       void seekToEnd()
+       {
+               offsetConsumer.assign(partitions());
 -                      // seekToEnd() works lazily: it only takes effect on poll()/position()
+               partitions().forEach(tp ->
+               {
 -              // The new positions must be commited!
 -              offsetConsumer.commitSync();
+                       Long offset = offsetConsumer.position(tp);
+                       log.info("New position for {}: {}", tp, offset);
++                      Integer partition = tp.partition();
++                      StatisticsDocument document =
++                                      partitionStatisticsRepository
++                                                      .findById(partition.toString())
++                                                      .orElse(new StatisticsDocument(partition));
++                      document.offset = offset;
++                      partitionStatisticsRepository.save(document);
+               });
+               offsetConsumer.unsubscribe();
+       }
        void doForCurrentOffsets(BiConsumer<TopicPartition, Long> consumer)
        {
 -              offsetConsumer.assign(partitions());
 -              partitions().forEach(tp -> consumer.accept(tp, offsetConsumer.position(tp)));
 -              offsetConsumer.unsubscribe();
 -      }
 +              partitions().forEach(tp ->
 +              {
 +                      String partition = Integer.toString(tp.partition());
 +                      Optional<Long> offset = partitionStatisticsRepository.findById(partition).map(document -> document.offset);
 +                      consumer.accept(tp, offset.orElse(0l));
 +              });
 +              }
  
        List<TopicPartition> partitions()
        {
        @BeforeEach
        public void init()
        {
 -              testHandler = record -> {} ;
 -
+               seekToEnd();
                oldOffsets = new HashMap<>();
                newOffsets = new HashMap<>();
                receivedRecords = new HashSet<>();
  
                        return new KafkaProducer<>(props);
                }
 -                      props.put("group.id", properties.getGroupId());
+               @Bean
+               KafkaConsumer<Bytes, Bytes> offsetConsumer(ApplicationProperties properties)
+               {
+                       Properties props = new Properties();
+                       props.put("bootstrap.servers", properties.getBootstrapServer());
+                       props.put("client.id", "OFFSET-CONSUMER");
++                      props.put("enable.auto.commit", false);
++                      props.put("auto.offset.reset", "latest");
+                       props.put("key.deserializer", BytesDeserializer.class.getName());
+                       props.put("value.deserializer", BytesDeserializer.class.getName());
+                       return new KafkaConsumer<>(props);
+               }
        }
  }