package de.juplo.kafka;
+import lombok.RequiredArgsConstructor;
+
import java.util.HashMap;
import java.util.Map;
+@RequiredArgsConstructor
public class CounterState
{
- private final Map<String, Long> counterState = new HashMap<>();
-
+ private final Map<String, Long> counterState;
- public void setCounterState(String key, long counter)
- {
- counterState.put(key, counter);
- }
public synchronized Long addToCounter(String key)
{
private final Phaser phaser = new Phaser(1);
private final Set<TopicPartition> assignedPartitions = new HashSet<>();
private volatile PartitionState[] partitionStates;
+ private Map<String, Long>[] restoredState;
private CounterState[] counterState;
private volatile long[] stateEndOffsets;
private volatile int[] seen;
{
partitionStates[i] = PartitionState.UNASSIGNED;
}
+ restoredState = new Map[numPartitions];
counterState = new CounterState[numPartitions];
stateEndOffsets = new long[numPartitions];
seen = new int[numPartitions];
String key,
String value)
{
- counterState[partition].setCounterState(key, Long.parseLong(value));
+ restoredState[partition].put(key, Long.parseLong(value));
if (offset + 1 == stateEndOffsets[partition])
{
log.info("{} - Restoring of state for partition {} done!", id, partition);
private void restoreAndAssign(int partition)
{
- counterState[partition] = new CounterState();
-
TopicPartition statePartition = new TopicPartition(this.stateTopic, partition);
long stateEndOffset = consumer
}
else
{
- log.info("{} - State is up-to-date for partition {}", id, partition);
+ log.info("{} - No state available for partition {}", id, partition);
+ restoredState[partition] = new HashMap<>();
stateAssigned(partition);
}
}
statePartition);
consumer.seek(statePartition, stateBeginningOffset);
stateEndOffsets[partition] = stateEndOffset;
+ restoredState[partition] = new HashMap<>();
log.info("{} - Resuming state partition {}", id, statePartition);
consumer.resume(List.of(statePartition));
}
TopicPartition statePartition = new TopicPartition(stateTopic, partition);
log.info("{} - Pausing state partition {}...", id, statePartition);
consumer.pause(List.of(statePartition));
+ counterState[partition] = new CounterState(restoredState[partition]);
+ restoredState[partition] = null;
TopicPartition messagePartition = new TopicPartition(topic, partition);
log.info("{} - Adding partition {} to the assigned partitions", id, messagePartition);