import org.apache.kafka.common.errors.WakeupException;
import java.time.Duration;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.*;
import java.util.concurrent.Phaser;
private volatile boolean running = false;
private final Phaser phaser = new Phaser(1);
+ private final Set<TopicPartition> assignedPartitions = new HashSet<>();
private volatile int[] seen;
private volatile int[] acked;
private volatile boolean[] done;
int phase = phaser.getPhase();
log.info("{} - Received {} messages in phase {}", id, records.count(), phase);
- records
- .partitions()
+ assignedPartitions
.forEach(partition ->
{
seen[partition.partition()] = 0;
}
done[partition.partition()] = true;
+ if (seen[partition.partition()] == 0)
+ {
+ int arrivedPhase = phaser.arrive();
+ log.debug("{} - Received no records for partition {} in phase {}", id, partition, arrivedPhase);
+ }
});
int arrivedPhase = phaser.arriveAndAwaitAdvance();
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions)
{
+ assignedPartitions.addAll(partitions);
phaser.bulkRegister(partitions.size());
log.info(
"{} - Added {} parties for newly assigned partitions. New total number of parties: {}",
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions)
{
+ assignedPartitions.removeAll(partitions);
partitions.forEach(partition -> phaser.arriveAndDeregister());
log.info(
"{} - Removed {} parties for revoked partitions. New total number of parties: {}",