From: Kai Moritz Date: Mon, 1 Feb 2021 14:45:54 +0000 (+0100) Subject: Fixed bug on first start-up (no current offsets) X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=313d30f59c98c36b7706a14c59417853c502b490;p=demos%2Fkafka%2Foutbox Fixed bug on first start-up (no current offsets) --- diff --git a/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxProducer.java b/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxProducer.java index 1327113..f0a226e 100644 --- a/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxProducer.java +++ b/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxProducer.java @@ -62,6 +62,7 @@ public class OutboxProducer props = new Properties(); props.put(BOOTSTRAP_SERVERS_CONFIG, properties.bootstrapServers); props.put(GROUP_ID_CONFIG, "outbox"); + props.put(AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); @@ -81,15 +82,23 @@ public class OutboxProducer long[] currentOffsets = new long[partitions.size()]; for (Map.Entry entry : consumer.committed(assignment).entrySet()) { - LOG.info("Found current offset {} for partition {}", entry.getValue(), entry.getKey()); - currentOffsets[entry.getKey().partition()] = entry.getValue().offset() - 1l; + if (entry.getValue() == null) + { + LOG.debug("Found no offset for partition {}", entry.getKey()); + currentOffsets[entry.getKey().partition()] = -1l; + } + else + { + LOG.debug("Found current offset {} for partition {}", entry.getValue(), entry.getKey()); + currentOffsets[entry.getKey().partition()] = entry.getValue().offset() - 1l; + } } LOG.info("Current offsets: {}", currentOffsets); long[] endOffsets = new long[partitions.size()]; for (Map.Entry entry : consumer.endOffsets(assignment).entrySet()) { - LOG.info("Found next offset {} for partition {}", entry.getValue(), entry.getKey()); + LOG.debug("Found next offset {} for partition {}", entry.getValue(), entry.getKey()); endOffsets[entry.getKey().partition()] = entry.getValue() - 1l; } LOG.info("End-offsets: {}", endOffsets); @@ -146,7 +155,7 @@ public class OutboxProducer int cleanUp() { int deleted = repository.delete(send); - LOG.debug("Cleaned up {}/{} entries from outbox, next clean-up: {}", deleted, send.size()); + LOG.debug("Cleaned up {}/{} entries from outbox", deleted, send.size()); send.clear(); return deleted; }