WIP
authorKai Moritz <kai@juplo.de>
Sat, 2 Nov 2024 18:23:36 +0000 (19:23 +0100)
committerKai Moritz <kai@juplo.de>
Sat, 9 Nov 2024 15:49:52 +0000 (16:49 +0100)
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/ExampleConsumer.java

index 49875a0..8437bb4 100644 (file)
@@ -12,6 +12,7 @@ import org.springframework.context.ConfigurableApplicationContext;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
+import java.time.Clock;
 import java.util.Properties;
 
 
@@ -24,6 +25,7 @@ public class ApplicationConfiguration
       Consumer<String, String> kafkaConsumer,
       Producer<String, String> kafkaProducer,
       ApplicationProperties properties,
+      Clock clock,
       ConfigurableApplicationContext applicationContext)
   {
     return
@@ -33,6 +35,8 @@ public class ApplicationConfiguration
             kafkaConsumer,
             properties.getProducerProperties().getTopic(),
             kafkaProducer,
+            clock,
+            properties.getConsumerProperties().getAutoCommitInterval(),
             () -> applicationContext.close());
   }
 
@@ -43,6 +47,7 @@ public class ApplicationConfiguration
     props.put("bootstrap.servers", properties.getBootstrapServer());
     props.put("client.id", properties.getClientId());
     props.put("group.id", properties.getConsumerProperties().getGroupId());
+    props.put("enable.auto.commit", false);
     if (properties.getConsumerProperties().getAutoOffsetReset() != null)
     {
       props.put("auto.offset.reset", properties.getConsumerProperties().getAutoOffsetReset().name());
@@ -65,6 +70,7 @@ public class ApplicationConfiguration
     Properties props = new Properties();
     props.put("bootstrap.servers", properties.getBootstrapServer());
     props.put("client.id", properties.getClientId());
+    props.put("transactional.id", "my-tx");
     props.put("acks", properties.getProducerProperties().getAcks());
     props.put("batch.size", properties.getProducerProperties().getBatchSize());
     props.put("metadata.maxage.ms",   5000); //  5 Sekunden
@@ -77,4 +83,10 @@ public class ApplicationConfiguration
 
     return new KafkaProducer<>(props);
   }
+
+  @Bean
+  Clock clock()
+  {
+    return Clock.systemDefaultZone();
+  }
 }
index 0fbeead..73f56a8 100644 (file)
@@ -1,18 +1,18 @@
 package de.juplo.kafka;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.*;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
 
+import java.time.Clock;
 import java.time.Duration;
+import java.time.Instant;
 import java.util.*;
 import java.util.concurrent.Phaser;
+import java.util.stream.Collectors;
 
 
 @Slf4j
@@ -27,7 +27,11 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
   private final String stateTopic;
   private final Producer<String, String> producer;
 
+  private final Clock clock;
+  private final Duration commitInterval;
+
   private volatile boolean running = false;
+  private Instant lastCommit;
   private final Phaser phaser = new Phaser(1);
   private final Set<TopicPartition> assignedPartitions = new HashSet<>();
   private volatile State[] partitionStates;
@@ -46,6 +50,8 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
     Consumer<String, String> consumer,
     String stateTopic,
     Producer<String, String> producer,
+    Clock clock,
+    Duration commitInterval,
     Runnable closeCallback)
   {
     this.id = clientId;
@@ -53,6 +59,8 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
     this.consumer = consumer;
     this.stateTopic = stateTopic;
     this.producer = producer;
+    this.clock = clock;
+    this.commitInterval = commitInterval;
 
     workerThread = new Thread(this, "ExampleConsumer Worker-Thread");
     workerThread.start();
@@ -85,6 +93,12 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
       consumer.subscribe(Arrays.asList(topic, stateTopic), this);
       running = true;
 
+      log.info("{} - Initializing the transaction", id);
+      producer.initTransactions();
+
+      lastCommit = clock.instant();
+      producer.beginTransaction();
+
       while (running)
       {
         ConsumerRecords<String, String> records =
@@ -132,6 +146,8 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
 
         int arrivedPhase = phaser.arriveAndAwaitAdvance();
         log.info("{} - Phase {} is done! Next phase: {}", id, phase, arrivedPhase);
+
+        commitIfNecessary();
       }
     }
     catch(WakeupException e)
@@ -141,9 +157,10 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
     catch(Exception e)
     {
       log.error("{} - Unexpected error, unsubscribing!", id, e.toString());
-      consumer.unsubscribe();
+      producer.abortTransaction();
       log.info("{} - Triggering exit of application!", id);
       new Thread(closeCallback).start();
+      return;
     }
     finally
     {
@@ -151,6 +168,34 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
       consumer.close();
       log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
     }
+
+    producer.commitTransaction();
+  }
+
+  private void commitIfNecessary()
+  {
+    Instant now = clock.instant();
+    if (!now.isBefore(lastCommit.plus(commitInterval)))
+    {
+      producer.sendOffsetsToTransaction(getCurrentOffsets(), consumer.groupMetadata());
+      commit();
+      lastCommit = now;
+    }
+  }
+
+  private Map<TopicPartition, OffsetAndMetadata> getCurrentOffsets()
+  {
+    return assignedPartitions
+      .stream()
+      .collect(Collectors.toMap(
+        partition -> partition,
+        partition -> new OffsetAndMetadata(consumer.position(partition))));
+  }
+
+  private void commit()
+  {
+    producer.commitTransaction();
+    producer.beginTransaction();
   }
 
   private void handleRecord(