- Long offset = consumer.position(tp);
- log.info(
- "{} - removing partition: {}, offset of next message {})",
- id,
- partition,
- offset);
- if (commitsEnabled)
- {
- Map<String, AdderResult> state = recordHandler.removePartition(partition);
- Map<String, List<AdderResult>> results = adderResults.removePartition(partition);
- stateRepository.save(new StateDocument(partition, state, results, offset));
- }
- else
+ Map<String, AdderResult> state = recordHandler.removePartition(partition);
+ for (String key : state.keySet())