From 313d30f59c98c36b7706a14c59417853c502b490 Mon Sep 17 00:00:00 2001
From: Kai Moritz <k.moritz@lvm.de>
Date: Mon, 1 Feb 2021 15:45:54 +0100
Subject: [PATCH] Fixed bug on first start-up (no current offsets)

---
 .../kafka/outbox/delivery/OutboxProducer.java   | 17 +++++++++++++----
 1 file changed, 13 insertions(+), 4 deletions(-)

diff --git a/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxProducer.java b/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxProducer.java
index 1327113..f0a226e 100644
--- a/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxProducer.java
+++ b/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxProducer.java
@@ -62,6 +62,7 @@ public class OutboxProducer
     props = new Properties();
     props.put(BOOTSTRAP_SERVERS_CONFIG, properties.bootstrapServers);
     props.put(GROUP_ID_CONFIG, "outbox");
+    props.put(AUTO_OFFSET_RESET_CONFIG, "earliest");
     props.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
     props.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
 
@@ -81,15 +82,23 @@ public class OutboxProducer
     long[] currentOffsets = new long[partitions.size()];
     for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : consumer.committed(assignment).entrySet())
     {
-      LOG.info("Found current offset {} for partition {}", entry.getValue(), entry.getKey());
-      currentOffsets[entry.getKey().partition()] = entry.getValue().offset() - 1l;
+      if (entry.getValue() == null)
+      {
+        LOG.debug("Found no offset for partition {}", entry.getKey());
+        currentOffsets[entry.getKey().partition()] = -1l;
+      }
+      else
+      {
+        LOG.debug("Found current offset {} for partition {}", entry.getValue(), entry.getKey());
+        currentOffsets[entry.getKey().partition()] = entry.getValue().offset() - 1l;
+      }
     }
     LOG.info("Current offsets: {}", currentOffsets);
 
     long[] endOffsets = new long[partitions.size()];
     for (Map.Entry<TopicPartition, Long> entry : consumer.endOffsets(assignment).entrySet())
     {
-      LOG.info("Found next offset {} for partition {}", entry.getValue(), entry.getKey());
+      LOG.debug("Found next offset {} for partition {}", entry.getValue(), entry.getKey());
       endOffsets[entry.getKey().partition()] = entry.getValue() - 1l;
     }
     LOG.info("End-offsets: {}", endOffsets);
@@ -146,7 +155,7 @@ public class OutboxProducer
   int cleanUp()
   {
     int deleted = repository.delete(send);
-    LOG.debug("Cleaned up {}/{} entries from outbox, next clean-up: {}", deleted, send.size());
+    LOG.debug("Cleaned up {}/{} entries from outbox", deleted, send.size());
     send.clear();
     return deleted;
   }
-- 
2.20.1