WIP
authorKai Moritz <kai@juplo.de>
Sat, 3 Jun 2023 10:08:56 +0000 (12:08 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 17 Jun 2023 12:43:14 +0000 (14:43 +0200)
docker-compose.yml
src/main/java/de/juplo/kafka/ApplicationProperties.java
src/main/java/de/juplo/kafka/EndlessConsumer.java
src/main/resources/application.yml

index c4492a4..98cb40a 100644 (file)
@@ -75,6 +75,10 @@ services:
   cli:
     image: juplo/toolbox
     command: sleep infinity
+    depends_on:
+      - kafka-1
+      - kafka-2
+      - kafka-3
 
   gateway:
     image: juplo/sumup-gateway:1.0-SNAPSHOT
index c7b0b8e..481fce0 100644 (file)
@@ -29,7 +29,6 @@ public class ApplicationProperties
   @NotEmpty
   private String topicIn;
   @NotNull
-  @NotEmpty
   private Integer partitions;
   @NotNull
   @NotEmpty
index 3e98842..771b3a2 100644 (file)
@@ -45,13 +45,17 @@ public class EndlessConsumer<K, V> implements Runnable
   @Override
   public void run()
   {
+    OffsetRecorder offsetRecorder = new OffsetRecorder(topic, partitions);
+
     try
     {
-      OffsetRecorder offsetRecorder = new OffsetRecorder(topic, partitions);
+      producer.initTransactions();
 
       log.info("{} - Subscribing to topic {}", id, topic);
       consumer.subscribe(Arrays.asList(topic), offsetRecorder);
 
+      producer.beginTransaction();
+
       while (true)
       {
         ConsumerRecords<K, V> records =
@@ -80,6 +84,8 @@ public class EndlessConsumer<K, V> implements Runnable
             producer.sendOffsetsToTransaction(
                 offsetRecorder.getOffsets(),
                 consumer.groupMetadata());
+            producer.commitTransaction();
+            producer.beginTransaction();
           }
           consumed++;
         }
@@ -88,7 +94,10 @@ public class EndlessConsumer<K, V> implements Runnable
     catch(WakeupException e)
     {
       log.info("{} - RIIING! Request to stop consumption - commiting current offsets!", id);
-      consumer.commitSync();
+      producer.sendOffsetsToTransaction(
+          offsetRecorder.getOffsets(),
+          consumer.groupMetadata());
+      producer.commitTransaction();
       shutdown();
     }
     catch(RecordDeserializationException e)
@@ -102,12 +111,16 @@ public class EndlessConsumer<K, V> implements Runnable
           offset,
           e.getCause().toString());
 
-      consumer.commitSync();
+      producer.sendOffsetsToTransaction(
+          offsetRecorder.getOffsets(),
+          consumer.groupMetadata());
+      producer.commitTransaction();
       shutdown(e);
     }
     catch(Exception e)
     {
       log.error("{} - Unexpected error: {}", id, e.toString(), e);
+      producer.abortTransaction();
       shutdown(e);
     }
     finally
@@ -141,7 +154,7 @@ public class EndlessConsumer<K, V> implements Runnable
       if (!activePartitions[partition])
         throw new IllegalStateException("Partition " + partition + " is not active!");
 
-      offsets[partition] = offset;
+      offsets[partition] = offset + 1;
     }
 
     Map<TopicPartition, OffsetAndMetadata> getOffsets()
@@ -151,9 +164,12 @@ public class EndlessConsumer<K, V> implements Runnable
       for (int i=0; i<offsets.length; i++)
       {
         if (activePartitions[i] && offsets[i] > -1)
+        {
+          log.info("Offset for partition {} is {}", i, offsets[i]);
           recordedOffsets.put(
               new TopicPartition(topic, i),
               new OffsetAndMetadata(offsets[i]));
+        }
       }
 
       return recordedOffsets;
@@ -182,9 +198,20 @@ public class EndlessConsumer<K, V> implements Runnable
                   () -> new HashMap<TopicPartition, OffsetAndMetadata>(),
                   (map, topicPartition) ->
                   {
-                    log.info("Commiting & deactivating revoked partition {}", topicPartition);
+                    log.info("Deactivating revoked partition {}", topicPartition);
                     activePartitions[topicPartition.partition()] = false;
-                    map.put(topicPartition, new OffsetAndMetadata(offsets[topicPartition.partition()]));
+                    if (offsets[topicPartition.partition()] < 0)
+                    {
+                      log.info("No offset to commit for partition {}", topicPartition);
+                    }
+                    else
+                    {
+                      log.info(
+                          "Commiting offset {} for partition {}",
+                          offsets[topicPartition.partition()],
+                          topicPartition);
+                      map.put(topicPartition, new OffsetAndMetadata(offsets[topicPartition.partition()]));
+                    }
                   },
                   (mapA, mapB) -> mapA.putAll(mapB)),
           consumer.groupMetadata());
index bf339b0..1c5a517 100644 (file)
@@ -4,6 +4,7 @@ sumup:
     group-id: sumup-requests
     client-id: DEV
     topic-in: in
+    partitions: 2
     topic-out: out
     auto-offset-reset: earliest
     commit-interval: 5s