From 01ca29b9857dfc5d87bf869f101fb38e5771c6d8 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 2 Nov 2024 17:05:23 +0100 Subject: [PATCH] =?utf8?q?Refactor:=20Zustand=20muss=20`CounterState`=20vo?= =?utf8?q?llst=C3=A4ndig=20=C3=BCbergeben=20werden?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- src/main/java/de/juplo/kafka/CounterState.java | 10 ++++------ src/main/java/de/juplo/kafka/ExampleConsumer.java | 12 ++++++++---- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/src/main/java/de/juplo/kafka/CounterState.java b/src/main/java/de/juplo/kafka/CounterState.java index 5e39371..715a97a 100644 --- a/src/main/java/de/juplo/kafka/CounterState.java +++ b/src/main/java/de/juplo/kafka/CounterState.java @@ -1,18 +1,16 @@ package de.juplo.kafka; +import lombok.RequiredArgsConstructor; + import java.util.HashMap; import java.util.Map; +@RequiredArgsConstructor public class CounterState { - private final Map counterState = new HashMap<>(); - + private final Map counterState; - public void setCounterState(String key, long counter) - { - counterState.put(key, counter); - } public synchronized Long addToCounter(String key) { diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index 112e2ff..3cc87bb 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -31,6 +31,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener private final Phaser phaser = new Phaser(1); private final Set assignedPartitions = new HashSet<>(); private volatile PartitionState[] partitionStates; + private Map[] restoredState; private CounterState[] counterState; private volatile long[] stateEndOffsets; private volatile int[] seen; @@ -73,6 +74,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener { partitionStates[i] = PartitionState.UNASSIGNED; } + restoredState = new Map[numPartitions]; counterState = new CounterState[numPartitions]; stateEndOffsets = new long[numPartitions]; seen = new int[numPartitions]; @@ -175,7 +177,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener String key, String value) { - counterState[partition].setCounterState(key, Long.parseLong(value)); + restoredState[partition].put(key, Long.parseLong(value)); if (offset + 1 == stateEndOffsets[partition]) { log.info("{} - Restoring of state for partition {} done!", id, partition); @@ -311,8 +313,6 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener private void restoreAndAssign(int partition) { - counterState[partition] = new CounterState(); - TopicPartition statePartition = new TopicPartition(this.stateTopic, partition); long stateEndOffset = consumer @@ -337,7 +337,8 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener } else { - log.info("{} - State is up-to-date for partition {}", id, partition); + log.info("{} - No state available for partition {}", id, partition); + restoredState[partition] = new HashMap<>(); stateAssigned(partition); } } @@ -373,6 +374,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener statePartition); consumer.seek(statePartition, stateBeginningOffset); stateEndOffsets[partition] = stateEndOffset; + restoredState[partition] = new HashMap<>(); log.info("{} - Resuming state partition {}", id, statePartition); consumer.resume(List.of(statePartition)); } @@ -390,6 +392,8 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener TopicPartition statePartition = new TopicPartition(stateTopic, partition); log.info("{} - Pausing state partition {}...", id, statePartition); consumer.pause(List.of(statePartition)); + counterState[partition] = new CounterState(restoredState[partition]); + restoredState[partition] = null; TopicPartition messagePartition = new TopicPartition(topic, partition); log.info("{} - Adding partition {} to the assigned partitions", id, messagePartition); -- 2.20.1