- Long offset = consumer.position(tp);
- log.info(
- "{} - removing partition: {}, offset of next message {})",
- id,
- partition,
- offset);
- if (commitsEnabled)
- {
- Map<String, AdderResult> removed = recordHandler.removePartition(partition);
- stateRepository.save(new StateDocument(partition, removed, offset));
- }
- else
+ log.info("{} - removing partition: {}", id, partition);
+ this.partitions.remove(partition);
+ Map<String, AdderResult> state = recordHandler.removePartition(partition);
+ for (String user : state.keySet())