- log.info("{} - removing partition: {}", id, partition);
- Map<String, Long> removed = recordHandler.removePartition(partition);
- for (String key : removed.keySet())
+ this.partitions.remove(partition);
+ Long offset = consumer.position(tp);
+ log.info(
+ "{} - removing partition: {}, offset of next message {})",
+ id,
+ partition,
+ offset);
+ if (commitsEnabled)
+ {
+ Map<String, AdderResult> state = recordHandler.removePartition(partition);
+ Map<String, List<AdderResult>> results = adderResults.removePartition(partition);
+ stateRepository.save(new StateDocument(partition, state, results, offset));
+ }
+ else