stateRepository
.findById(Integer.toString(partition))
.orElse(new StateDocument(partition));
- if (document.offset >= 0)
- {
- // Only seek, if a stored offset was found
- // Otherwise: Use initial offset, generated by Kafka
- consumer.seek(tp, document.offset);
- log.info(
- "{} - Seeking to offset {} for partition {}",
- id,
- document.offset,
- partition);
- }
+ log.info(
+ "{} - Offset of next unseen message for partition {}: {}",
+ id,
+ partition,
+ document.offset);
+ consumer.seek(tp, document.offset);
recordHandler.addPartition(partition, document.state);
for (String user : document.state.keySet())
{
{
@Id
public String id;
- public long offset = -1l;
+ public long offset = 0l;
public Map<String, AdderResult> state;
public Map<String, List<AdderResult>> results;