Fixed bug on first start-up (no current offsets) master
authorKai Moritz <k.moritz@lvm.de>
Mon, 1 Feb 2021 14:45:54 +0000 (15:45 +0100)
committerKai Moritz <kai@juplo.de>
Sun, 16 May 2021 22:04:08 +0000 (00:04 +0200)
delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxProducer.java

index 1327113..f0a226e 100644 (file)
@@ -62,6 +62,7 @@ public class OutboxProducer
     props = new Properties();
     props.put(BOOTSTRAP_SERVERS_CONFIG, properties.bootstrapServers);
     props.put(GROUP_ID_CONFIG, "outbox");
     props = new Properties();
     props.put(BOOTSTRAP_SERVERS_CONFIG, properties.bootstrapServers);
     props.put(GROUP_ID_CONFIG, "outbox");
+    props.put(AUTO_OFFSET_RESET_CONFIG, "earliest");
     props.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
     props.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
 
     props.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
     props.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
 
@@ -81,15 +82,23 @@ public class OutboxProducer
     long[] currentOffsets = new long[partitions.size()];
     for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : consumer.committed(assignment).entrySet())
     {
     long[] currentOffsets = new long[partitions.size()];
     for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : consumer.committed(assignment).entrySet())
     {
-      LOG.info("Found current offset {} for partition {}", entry.getValue(), entry.getKey());
-      currentOffsets[entry.getKey().partition()] = entry.getValue().offset() - 1l;
+      if (entry.getValue() == null)
+      {
+        LOG.debug("Found no offset for partition {}", entry.getKey());
+        currentOffsets[entry.getKey().partition()] = -1l;
+      }
+      else
+      {
+        LOG.debug("Found current offset {} for partition {}", entry.getValue(), entry.getKey());
+        currentOffsets[entry.getKey().partition()] = entry.getValue().offset() - 1l;
+      }
     }
     LOG.info("Current offsets: {}", currentOffsets);
 
     long[] endOffsets = new long[partitions.size()];
     for (Map.Entry<TopicPartition, Long> entry : consumer.endOffsets(assignment).entrySet())
     {
     }
     LOG.info("Current offsets: {}", currentOffsets);
 
     long[] endOffsets = new long[partitions.size()];
     for (Map.Entry<TopicPartition, Long> entry : consumer.endOffsets(assignment).entrySet())
     {
-      LOG.info("Found next offset {} for partition {}", entry.getValue(), entry.getKey());
+      LOG.debug("Found next offset {} for partition {}", entry.getValue(), entry.getKey());
       endOffsets[entry.getKey().partition()] = entry.getValue() - 1l;
     }
     LOG.info("End-offsets: {}", endOffsets);
       endOffsets[entry.getKey().partition()] = entry.getValue() - 1l;
     }
     LOG.info("End-offsets: {}", endOffsets);
@@ -146,7 +155,7 @@ public class OutboxProducer
   int cleanUp()
   {
     int deleted = repository.delete(send);
   int cleanUp()
   {
     int deleted = repository.delete(send);
-    LOG.debug("Cleaned up {}/{} entries from outbox, next clean-up: {}", deleted, send.size());
+    LOG.debug("Cleaned up {}/{} entries from outbox", deleted, send.size());
     send.clear();
     return deleted;
   }
     send.clear();
     return deleted;
   }