ApplicationRecordHandler recordHandler,
AdderResults adderResults,
StateRepository stateRepository,
- ConsumerFactory<String, Message> consumerFactory,
KafkaProperties kafkaProperties)
{
return new ApplicationRebalanceListener(
recordHandler,
adderResults,
stateRepository,
- consumerFactory.createConsumer(),
kafkaProperties.getConsumer().getGroupId());
}
private final ApplicationRecordHandler recordHandler;
private final AdderResults adderResults;
private final StateRepository stateRepository;
- private final Consumer consumer;
private final String id;
private final Set<Integer> partitions = new HashSet<>();
@Override
- public void onPartitionsAssigned(Collection<TopicPartition> partitions)
+ public void onPartitionsAssigned(
+ Consumer<? , ?> consumer,
+ Collection<TopicPartition> partitions)
{
partitions.forEach(tp ->
{
}
@Override
- public void onPartitionsRevoked(Collection<TopicPartition> partitions)
+ public void onPartitionsRevokedAfterCommit(
+ Consumer<?, ?> consumer,
+ Collection<TopicPartition> partitions)
{
partitions.forEach(tp ->
{