props = new Properties();
props.put(BOOTSTRAP_SERVERS_CONFIG, properties.bootstrapServers);
props.put(GROUP_ID_CONFIG, "outbox");
+ props.put(AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
long[] currentOffsets = new long[partitions.size()];
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : consumer.committed(assignment).entrySet())
{
- LOG.info("Found current offset {} for partition {}", entry.getValue(), entry.getKey());
- currentOffsets[entry.getKey().partition()] = entry.getValue().offset() - 1l;
+ if (entry.getValue() == null)
+ {
+ LOG.debug("Found no offset for partition {}", entry.getKey());
+ currentOffsets[entry.getKey().partition()] = -1l;
+ }
+ else
+ {
+ LOG.debug("Found current offset {} for partition {}", entry.getValue(), entry.getKey());
+ currentOffsets[entry.getKey().partition()] = entry.getValue().offset() - 1l;
+ }
}
LOG.info("Current offsets: {}", currentOffsets);
long[] endOffsets = new long[partitions.size()];
for (Map.Entry<TopicPartition, Long> entry : consumer.endOffsets(assignment).entrySet())
{
- LOG.info("Found next offset {} for partition {}", entry.getValue(), entry.getKey());
+ LOG.debug("Found next offset {} for partition {}", entry.getValue(), entry.getKey());
endOffsets[entry.getKey().partition()] = entry.getValue() - 1l;
}
LOG.info("End-offsets: {}", endOffsets);
int cleanUp()
{
int deleted = repository.delete(send);
- LOG.debug("Cleaned up {}/{} entries from outbox, next clean-up: {}", deleted, send.size());
+ LOG.debug("Cleaned up {}/{} entries from outbox", deleted, send.size());
send.clear();
return deleted;
}