WIP
authorKai Moritz <kai@juplo.de>
Sat, 3 Jun 2023 09:50:05 +0000 (11:50 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 17 Jun 2023 12:43:01 +0000 (14:43 +0200)
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/ApplicationProperties.java
src/main/java/de/juplo/kafka/EndlessConsumer.java

index 033d0cc..ef3db82 100644 (file)
@@ -9,9 +9,12 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.kafka.support.serializer.JsonSerializer;
+
+import java.time.Clock;
 import java.util.Properties;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.stream.IntStream;
 
 
 @Configuration
@@ -32,17 +35,23 @@ public class ApplicationConfiguration
   @Bean
   public EndlessConsumer<String, Integer> endlessConsumer(
       KafkaConsumer<String, Integer> kafkaConsumer,
+      KafkaProducer<String, Object> kafkaProducer,
       ExecutorService executor,
       ApplicationRecordHandler recordHandler,
-      ApplicationProperties properties)
+      ApplicationProperties properties,
+      Clock clock)
   {
     return
         new EndlessConsumer<>(
             executor,
             properties.getClientId(),
             properties.getTopicIn(),
+            properties.getPartitions(),
             kafkaConsumer,
-            recordHandler);
+            kafkaProducer,
+            recordHandler,
+            clock,
+            properties.getCommitInterval().toMillis());
   }
 
   @Bean
@@ -61,7 +70,7 @@ public class ApplicationConfiguration
     props.put("group.id", properties.getGroupId());
     props.put("client.id", properties.getClientId());
     props.put("auto.offset.reset", properties.getAutoOffsetReset());
-    props.put("auto.commit.interval.ms", (int)properties.getCommitInterval().toMillis());
+    props.put("enable.auto.commit", false);
     props.put("metadata.max.age.ms", "1000");
     props.put("key.deserializer", StringDeserializer.class.getName());
     props.put("value.deserializer", IntegerDeserializer.class.getName());
@@ -90,4 +99,10 @@ public class ApplicationConfiguration
 
     return new KafkaProducer<>(props);
   }
+
+  @Bean
+  public Clock clock()
+  {
+    return Clock.systemDefaultZone();
+  }
 }
index ccddc81..c7b0b8e 100644 (file)
@@ -30,6 +30,9 @@ public class ApplicationProperties
   private String topicIn;
   @NotNull
   @NotEmpty
+  private Integer partitions;
+  @NotNull
+  @NotEmpty
   private String autoOffsetReset;
   @NotNull
   private Duration commitInterval;
index 63a2f93..3e98842 100644 (file)
@@ -3,11 +3,13 @@ package de.juplo.kafka;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.*;
+import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.RecordDeserializationException;
 import org.apache.kafka.common.errors.WakeupException;
 
 import javax.annotation.PreDestroy;
+import java.time.Clock;
 import java.time.Duration;
 import java.util.*;
 import java.util.concurrent.ExecutionException;
@@ -24,14 +26,19 @@ public class EndlessConsumer<K, V> implements Runnable
   private final ExecutorService executor;
   private final String id;
   private final String topic;
+  private final int partitions;
   private final Consumer<K, V> consumer;
+  private final Producer<?, ?> producer;
   private final RecordHandler<K, V> handler;
+  private final Clock clock;
+  private final long autoCommitIntervalMs;
 
   private final Lock lock = new ReentrantLock();
   private final Condition condition = lock.newCondition();
   private boolean running = false;
   private Exception exception;
   private long consumed = 0;
+  private long lastCommit = 0;
 
 
 
@@ -40,8 +47,10 @@ public class EndlessConsumer<K, V> implements Runnable
   {
     try
     {
+      OffsetRecorder offsetRecorder = new OffsetRecorder(topic, partitions);
+
       log.info("{} - Subscribing to topic {}", id, topic);
-      consumer.subscribe(Arrays.asList(topic));
+      consumer.subscribe(Arrays.asList(topic), offsetRecorder);
 
       while (true)
       {
@@ -62,8 +71,16 @@ public class EndlessConsumer<K, V> implements Runnable
               record.value()
           );
 
+          offsetRecorder.recordOffset(record.partition(), record.offset());
           handler.accept(record);
 
+          long now = clock.millis();
+          if (now - lastCommit >= autoCommitIntervalMs)
+          {
+            producer.sendOffsetsToTransaction(
+                offsetRecorder.getOffsets(),
+                consumer.groupMetadata());
+          }
           consumed++;
         }
       }
@@ -99,6 +116,93 @@ public class EndlessConsumer<K, V> implements Runnable
     }
   }
 
+  class OffsetRecorder implements ConsumerRebalanceListener
+  {
+    private final String topic;
+    private final boolean[] activePartitions;
+    private final long[] offsets;
+
+
+    OffsetRecorder(String topic, int partitions)
+    {
+      this.topic = topic;
+      activePartitions = new boolean[partitions];
+      offsets = new long[partitions];
+      for (int i=0; i< partitions; i++)
+      {
+        offsets[i] = -1;
+        activePartitions[i] = false;
+      }
+    }
+
+
+    void recordOffset(int partition, long offset)
+    {
+      if (!activePartitions[partition])
+        throw new IllegalStateException("Partition " + partition + " is not active!");
+
+      offsets[partition] = offset;
+    }
+
+    Map<TopicPartition, OffsetAndMetadata> getOffsets()
+    {
+      Map<TopicPartition, OffsetAndMetadata> recordedOffsets = new HashMap<>();
+
+      for (int i=0; i<offsets.length; i++)
+      {
+        if (activePartitions[i] && offsets[i] > -1)
+          recordedOffsets.put(
+              new TopicPartition(topic, i),
+              new OffsetAndMetadata(offsets[i]));
+      }
+
+      return recordedOffsets;
+    }
+
+    @Override
+    public void onPartitionsAssigned(Collection<TopicPartition> assignedPartitions)
+    {
+      assignedPartitions
+          .stream()
+          .forEach(topicPartition ->
+          {
+            log.info("Activating partition {}", topicPartition);
+            activePartitions[topicPartition.partition()] = true;
+            offsets[topicPartition.partition()] = -1;
+          });
+    }
+
+    @Override
+    public void onPartitionsRevoked(Collection<TopicPartition> revokedPartitions)
+    {
+      producer.sendOffsetsToTransaction(
+          revokedPartitions
+              .stream()
+              .collect(
+                  () -> new HashMap<TopicPartition, OffsetAndMetadata>(),
+                  (map, topicPartition) ->
+                  {
+                    log.info("Commiting & deactivating revoked partition {}", topicPartition);
+                    activePartitions[topicPartition.partition()] = false;
+                    map.put(topicPartition, new OffsetAndMetadata(offsets[topicPartition.partition()]));
+                  },
+                  (mapA, mapB) -> mapA.putAll(mapB)),
+          consumer.groupMetadata());
+    }
+
+    @Override
+    public void onPartitionsLost(Collection<TopicPartition> lostPartitions)
+    {
+      lostPartitions
+          .stream()
+          .forEach((topicPartition) ->
+          {
+            log.info("Deactivating lost partition {}", topicPartition);
+            activePartitions[topicPartition.partition()] = false;
+          });
+    }
+  }
+
   private void shutdown()
   {
     shutdown(null);