* Hier wurde die Deaktivierung der Berücksichtigung des über
`auto.offset.rest` konfigurierbaren Verhaltens der Consumer-Bibiliothek
für den Fall, dass noch keine Offset-Position bekannt ist, zurückgebaut,
um die einzelnen Schritte der Übung leichter nachvollziehbar zu machen.
stateRepository
.findById(Integer.toString(partition))
.orElse(new StateDocument(partition));
stateRepository
.findById(Integer.toString(partition))
.orElse(new StateDocument(partition));
- if (document.offset >= 0)
- {
- // Only seek, if a stored offset was found
- // Otherwise: Use initial offset, generated by Kafka
- consumer.seek(tp, document.offset);
- log.info(
- "{} - Seeking to offset {} for partition {}",
- id,
- document.offset,
- partition);
- }
+ log.info(
+ "{} - Offset of next unseen message for partition {}: {}",
+ id,
+ partition,
+ document.offset);
+ consumer.seek(tp, document.offset);
recordHandler.addPartition(partition, document.state);
for (String user : document.state.keySet())
{
recordHandler.addPartition(partition, document.state);
for (String user : document.state.keySet())
{
- public long offset = -1l;
+ public long offset = 0l;
public Map<String, AdderResult> state;
public Map<String, List<AdderResult>> results;
public Map<String, AdderResult> state;
public Map<String, List<AdderResult>> results;