From: Kai Moritz Date: Mon, 1 Feb 2021 14:45:54 +0000 (+0100) Subject: Fixed bug on first start-up (no current offsets) X-Git-Tag: eod-outbox-alt X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=38520af8a21adc0a0590bafec3ed0dd080028604;p=demos%2Fkafka%2Foutbox Fixed bug on first start-up (no current offsets) --- diff --git a/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxProducer.java b/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxProducer.java index b139b4d..0bc284a 100644 --- a/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxProducer.java +++ b/delivery/src/main/java/de/juplo/kafka/outbox/delivery/OutboxProducer.java @@ -62,6 +62,7 @@ public class OutboxProducer props = new Properties(); props.put(BOOTSTRAP_SERVERS_CONFIG, properties.bootstrapServers); props.put(GROUP_ID_CONFIG, "outbox"); + props.put(AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); @@ -80,15 +81,23 @@ public class OutboxProducer long[] currentOffsets = new long[partitions.size()]; for (Map.Entry entry : consumer.committed(assignment).entrySet()) { - LOG.info("Found current offset {} for partition {}", entry.getValue(), entry.getKey()); - currentOffsets[entry.getKey().partition()] = entry.getValue().offset() - 1l; + if (entry.getValue() == null) + { + LOG.debug("Found no offset for partition {}", entry.getKey()); + currentOffsets[entry.getKey().partition()] = -1l; + } + else + { + LOG.debug("Found current offset {} for partition {}", entry.getValue(), entry.getKey()); + currentOffsets[entry.getKey().partition()] = entry.getValue().offset() - 1l; + } } LOG.info("Current offsets: {}", currentOffsets); long[] endOffsets = new long[partitions.size()]; for (Map.Entry entry : consumer.endOffsets(assignment).entrySet()) { - LOG.info("Found next offset {} for partition {}", entry.getValue(), entry.getKey()); + LOG.debug("Found next offset {} for partition {}", entry.getValue(), entry.getKey()); endOffsets[entry.getKey().partition()] = entry.getValue() - 1l; } LOG.info("End-offsets: {}", endOffsets); @@ -145,7 +154,7 @@ public class OutboxProducer int cleanUp() { int deleted = repository.delete(send); - LOG.debug("Cleaned up {}/{} entries from outbox, next clean-up: {}", deleted, send.size()); + LOG.debug("Cleaned up {}/{} entries from outbox", deleted, send.size()); send.clear(); return deleted; } diff --git a/docker-compose.yml b/docker-compose.yml index c03aeb7..e4aa2a4 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -24,7 +24,7 @@ services: kafkacat: image: confluentinc/cp-kafkacat:6.0.1 - command: "kafkacat -C -b kafka:9093 -t outbox -K:" + command: "kafkacat -C -b kafka:9093 -q -t outbox -K:" tty: true depends_on: - kafka