Verbesserungen und Fachlogik-Test aus 'sumup-adder' gemerged
authorKai Moritz <kai@juplo.de>
Tue, 16 Aug 2022 16:58:10 +0000 (18:58 +0200)
committerKai Moritz <kai@juplo.de>
Wed, 17 Aug 2022 20:42:42 +0000 (22:42 +0200)
1  2 
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java
src/main/java/de/juplo/kafka/StateDocument.java

@@@ -25,15 -32,20 +31,17 @@@ public class ApplicationConfiguratio
    @Bean
    public ApplicationRebalanceListener rebalanceListener(
        ApplicationRecordHandler recordHandler,
+       AdderResults adderResults,
        StateRepository stateRepository,
 -      Consumer<String, String> consumer,
        ApplicationProperties properties)
    {
      return new ApplicationRebalanceListener(
          recordHandler,
+         adderResults,
          stateRepository,
          properties.getClientId(),
 -        properties.getTopic(),
          Clock.systemDefaultZone(),
 -        properties.getCommitInterval(),
 -        consumer);
 +        properties.getCommitInterval());
    }
  
    @Bean
@@@ -16,12 -16,18 +15,15 @@@ import java.util.*
  public class ApplicationRebalanceListener implements PollIntervalAwareConsumerRebalanceListener
  {
    private final ApplicationRecordHandler recordHandler;
+   private final AdderResults adderResults;
    private final StateRepository stateRepository;
    private final String id;
 -  private final String topic;
    private final Clock clock;
    private final Duration commitInterval;
 -  private final Consumer<String, String> consumer;
  
+   private final Set<Integer> partitions = new HashSet<>();
    private Instant lastCommit = Instant.EPOCH;
 -  private boolean commitsEnabled = true;
  
    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions)
      partitions.forEach(tp ->
      {
        Integer partition = tp.partition();
 +      log.info("{} - adding partition: {}", id, partition);
+       this.partitions.add(partition);
        StateDocument document =
            stateRepository
                .findById(Integer.toString(partition))
                .orElse(new StateDocument(partition));
 -      log.info("{} - adding partition: {}, offset={}", id, partition, document.offset);
 -      if (document.offset >= 0)
 -      {
 -        // Only seek, if a stored offset was found
 -        // Otherwise: Use initial offset, generated by Kafka
 -        consumer.seek(tp, document.offset);
 -      }
        recordHandler.addPartition(partition, document.state);
+       adderResults.addPartition(partition, document.results);
      });
    }
  
      partitions.forEach(tp ->
      {
        Integer partition = tp.partition();
-       Map<String, Long> removed = recordHandler.removePartition(partition);
-       for (String key : removed.keySet())
 +      log.info("{} - removing partition: {}", id, partition);
 -      Long offset = consumer.position(tp);
 -      log.info(
 -          "{} - removing partition: {}, offset of next message {})",
 -          id,
 -          partition,
 -          offset);
 -      if (commitsEnabled)
 -      {
 -        Map<String, AdderResult> state = recordHandler.removePartition(partition);
 -        Map<String, List<AdderResult>> results = adderResults.removePartition(partition);
 -        stateRepository.save(new StateDocument(partition, state, results, offset));
 -      }
 -      else
+       this.partitions.remove(partition);
++      Map<String, AdderResult> state = recordHandler.removePartition(partition);
++      for (String key : state.keySet())
        {
 -        log.info("Offset commits are disabled! Last commit: {}", lastCommit);
 +        log.info(
 +            "{} - Seen {} messages for partition={}|key={}",
 +            id,
-             removed.get(key),
++            state.get(key),
 +            partition,
 +            key);
        }
-       stateRepository.save(new StateDocument(partition, removed));
++      Map<String, List<AdderResult>> results = adderResults.removePartition(partition);
++      stateRepository.save(new StateDocument(partition, state, results));
      });
    }
  
    @Override
    public void beforeNextPoll()
    {
 -    if (!commitsEnabled)
 -    {
 -      log.info("Offset commits are disabled! Last commit: {}", lastCommit);
 -      return;
 -    }
 -
      if (lastCommit.plus(commitInterval).isBefore(clock.instant()))
      {
 -      log.debug("Storing data and offsets, last commit: {}", lastCommit);
 +      log.debug("Storing data, last commit: {}", lastCommit);
-       recordHandler.getState().forEach((partiton, adder) -> stateRepository.save(
+       partitions.forEach(partition -> stateRepository.save(
            new StateDocument(
-               partiton,
-               adder.getState())));
+               partition,
+               recordHandler.getState(partition).getState(),
 -              adderResults.getState(partition),
 -              consumer.position(new TopicPartition(topic, partition)))));
++              adderResults.getState(partition))));
        lastCommit = clock.instant();
      }
    }
@@@ -14,7 -15,9 +15,8 @@@ public class StateDocumen
  {
    @Id
    public String id;
-   public Map<String, Long> state;
 -  public long offset = -1l;
+   public Map<String, AdderResult> state;
+   public Map<String, List<AdderResult>> results;
  
    public StateDocument()
    {
  
    public StateDocument(
        Integer partition,
-       Map<String, Long> state)
+       Map<String, AdderResult> state,
 -      Map<String, List<AdderResult>> results,
 -      long offset)
++      Map<String, List<AdderResult>> results)
    {
      this.id = Integer.toString(partition);
      this.state = state;
 -    this.offset = offset;
+     this.results = results;
    }
  }