Fixed bug on first start-up (no current offsets) eod-outbox-alt
authorKai Moritz <k.moritz@lvm.de>
Mon, 1 Feb 2021 14:45:54 +0000 (15:45 +0100)
committerKai Moritz <kai@juplo.de>
Sat, 6 Feb 2021 18:11:30 +0000 (19:11 +0100)
delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxProducer.java
docker-compose.yml

index b139b4d..0bc284a 100644 (file)
@@ -62,6 +62,7 @@ public class OutboxProducer
     props = new Properties();
     props.put(BOOTSTRAP_SERVERS_CONFIG, properties.bootstrapServers);
     props.put(GROUP_ID_CONFIG, "outbox");
+    props.put(AUTO_OFFSET_RESET_CONFIG, "earliest");
     props.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
     props.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
 
@@ -80,15 +81,23 @@ public class OutboxProducer
     long[] currentOffsets = new long[partitions.size()];
     for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : consumer.committed(assignment).entrySet())
     {
-      LOG.info("Found current offset {} for partition {}", entry.getValue(), entry.getKey());
-      currentOffsets[entry.getKey().partition()] = entry.getValue().offset() - 1l;
+      if (entry.getValue() == null)
+      {
+        LOG.debug("Found no offset for partition {}", entry.getKey());
+        currentOffsets[entry.getKey().partition()] = -1l;
+      }
+      else
+      {
+        LOG.debug("Found current offset {} for partition {}", entry.getValue(), entry.getKey());
+        currentOffsets[entry.getKey().partition()] = entry.getValue().offset() - 1l;
+      }
     }
     LOG.info("Current offsets: {}", currentOffsets);
 
     long[] endOffsets = new long[partitions.size()];
     for (Map.Entry<TopicPartition, Long> entry : consumer.endOffsets(assignment).entrySet())
     {
-      LOG.info("Found next offset {} for partition {}", entry.getValue(), entry.getKey());
+      LOG.debug("Found next offset {} for partition {}", entry.getValue(), entry.getKey());
       endOffsets[entry.getKey().partition()] = entry.getValue() - 1l;
     }
     LOG.info("End-offsets: {}", endOffsets);
@@ -145,7 +154,7 @@ public class OutboxProducer
   int cleanUp()
   {
     int deleted = repository.delete(send);
-    LOG.debug("Cleaned up {}/{} entries from outbox, next clean-up: {}", deleted, send.size());
+    LOG.debug("Cleaned up {}/{} entries from outbox", deleted, send.size());
     send.clear();
     return deleted;
   }
index c03aeb7..e4aa2a4 100644 (file)
@@ -24,7 +24,7 @@ services:
 
   kafkacat:
     image: confluentinc/cp-kafkacat:6.0.1
-    command: "kafkacat -C -b kafka:9093 -t outbox -K:"
+    command: "kafkacat -C -b kafka:9093 -q -t outbox -K:"
     tty: true
     depends_on:
       - kafka