- log.debug("Storing data and offsets, last commit: {}", lastCommit);
- recordHandler.getState().forEach((partiton, adder) -> stateRepository.save(
- new StateDocument(
- partiton,
- adder.getState(),
- consumer.position(new TopicPartition(topic, partiton)))));
- lastCommit = clock.instant();
- }
+ Integer partition = tp.partition();
+ log.info("{} - removing partition: {}", id, partition);
+ this.partitions.remove(partition);
+ Map<String, AdderResult> state = recordHandler.removePartition(partition);
+ for (String user : state.keySet())
+ {
+ log.info(
+ "{} - Saved state for partition={}|user={}: {}",
+ id,
+ partition,
+ user,
+ state.get(user));
+ }
+ Map<String, List<AdderResult>> results = adderResults.removePartition(partition);
+ stateRepository.save(new StateDocument(partition, state, results));
+ });