From: Kai Moritz <kai@juplo.de>
Date: Sat, 3 Jun 2023 10:08:56 +0000 (+0200)
Subject: WIP
X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=6f3bc4b9531d967df59cc6a6f2d76d7ba9d2ec7a;p=demos%2Fkafka%2Ftraining

WIP
---

diff --git a/docker-compose.yml b/docker-compose.yml
index c4492a4b..98cb40a7 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -75,6 +75,10 @@ services:
   cli:
     image: juplo/toolbox
     command: sleep infinity
+    depends_on:
+      - kafka-1
+      - kafka-2
+      - kafka-3
 
   gateway:
     image: juplo/sumup-gateway:1.0-SNAPSHOT
diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java
index c7b0b8e1..481fce0d 100644
--- a/src/main/java/de/juplo/kafka/ApplicationProperties.java
+++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java
@@ -29,7 +29,6 @@ public class ApplicationProperties
   @NotEmpty
   private String topicIn;
   @NotNull
-  @NotEmpty
   private Integer partitions;
   @NotNull
   @NotEmpty
diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java
index 3e98842f..771b3a21 100644
--- a/src/main/java/de/juplo/kafka/EndlessConsumer.java
+++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java
@@ -45,13 +45,17 @@ public class EndlessConsumer<K, V> implements Runnable
   @Override
   public void run()
   {
+    OffsetRecorder offsetRecorder = new OffsetRecorder(topic, partitions);
+
     try
     {
-      OffsetRecorder offsetRecorder = new OffsetRecorder(topic, partitions);
+      producer.initTransactions();
 
       log.info("{} - Subscribing to topic {}", id, topic);
       consumer.subscribe(Arrays.asList(topic), offsetRecorder);
 
+      producer.beginTransaction();
+
       while (true)
       {
         ConsumerRecords<K, V> records =
@@ -80,6 +84,8 @@ public class EndlessConsumer<K, V> implements Runnable
             producer.sendOffsetsToTransaction(
                 offsetRecorder.getOffsets(),
                 consumer.groupMetadata());
+            producer.commitTransaction();
+            producer.beginTransaction();
           }
           consumed++;
         }
@@ -88,7 +94,10 @@ public class EndlessConsumer<K, V> implements Runnable
     catch(WakeupException e)
     {
       log.info("{} - RIIING! Request to stop consumption - commiting current offsets!", id);
-      consumer.commitSync();
+      producer.sendOffsetsToTransaction(
+          offsetRecorder.getOffsets(),
+          consumer.groupMetadata());
+      producer.commitTransaction();
       shutdown();
     }
     catch(RecordDeserializationException e)
@@ -102,12 +111,16 @@ public class EndlessConsumer<K, V> implements Runnable
           offset,
           e.getCause().toString());
 
-      consumer.commitSync();
+      producer.sendOffsetsToTransaction(
+          offsetRecorder.getOffsets(),
+          consumer.groupMetadata());
+      producer.commitTransaction();
       shutdown(e);
     }
     catch(Exception e)
     {
       log.error("{} - Unexpected error: {}", id, e.toString(), e);
+      producer.abortTransaction();
       shutdown(e);
     }
     finally
@@ -141,7 +154,7 @@ public class EndlessConsumer<K, V> implements Runnable
       if (!activePartitions[partition])
         throw new IllegalStateException("Partition " + partition + " is not active!");
 
-      offsets[partition] = offset;
+      offsets[partition] = offset + 1;
     }
 
     Map<TopicPartition, OffsetAndMetadata> getOffsets()
@@ -151,9 +164,12 @@ public class EndlessConsumer<K, V> implements Runnable
       for (int i=0; i<offsets.length; i++)
       {
         if (activePartitions[i] && offsets[i] > -1)
+        {
+          log.info("Offset for partition {} is {}", i, offsets[i]);
           recordedOffsets.put(
               new TopicPartition(topic, i),
               new OffsetAndMetadata(offsets[i]));
+        }
       }
 
       return recordedOffsets;
@@ -182,9 +198,20 @@ public class EndlessConsumer<K, V> implements Runnable
                   () -> new HashMap<TopicPartition, OffsetAndMetadata>(),
                   (map, topicPartition) ->
                   {
-                    log.info("Commiting & deactivating revoked partition {}", topicPartition);
+                    log.info("Deactivating revoked partition {}", topicPartition);
                     activePartitions[topicPartition.partition()] = false;
-                    map.put(topicPartition, new OffsetAndMetadata(offsets[topicPartition.partition()]));
+                    if (offsets[topicPartition.partition()] < 0)
+                    {
+                      log.info("No offset to commit for partition {}", topicPartition);
+                    }
+                    else
+                    {
+                      log.info(
+                          "Commiting offset {} for partition {}",
+                          offsets[topicPartition.partition()],
+                          topicPartition);
+                      map.put(topicPartition, new OffsetAndMetadata(offsets[topicPartition.partition()]));
+                    }
                   },
                   (mapA, mapB) -> mapA.putAll(mapB)),
           consumer.groupMetadata());
diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml
index bf339b07..1c5a517a 100644
--- a/src/main/resources/application.yml
+++ b/src/main/resources/application.yml
@@ -4,6 +4,7 @@ sumup:
     group-id: sumup-requests
     client-id: DEV
     topic-in: in
+    partitions: 2
     topic-out: out
     auto-offset-reset: earliest
     commit-interval: 5s