From 998470eaf40bb71f517423e1aee8726aaf258d84 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 2 Nov 2024 16:54:01 +0100 Subject: [PATCH] =?utf8?q?Der=20Z=C3=A4hlerzustand=20wird=20separat=20pro?= =?utf8?q?=20Partition=20verwaltet?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Dadurch ist es möglich, den Zustand für entzogene Partitionen zu löschen. * D.h., bei der Ausgabe ist immer klar ersichtlich, über welchen Zustand die angefragte Instanz gerade verfügt. --- .../juplo/kafka/CounterStateController.java | 2 +- .../java/de/juplo/kafka/ExampleConsumer.java | 20 ++++++++++++------- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/src/main/java/de/juplo/kafka/CounterStateController.java b/src/main/java/de/juplo/kafka/CounterStateController.java index 3d96582..723a8d0 100644 --- a/src/main/java/de/juplo/kafka/CounterStateController.java +++ b/src/main/java/de/juplo/kafka/CounterStateController.java @@ -15,7 +15,7 @@ public class CounterStateController private final ExampleConsumer consumer; @GetMapping - Map getAllCounters() + Map> getAllCounters() { return new HashMap<>(consumer.getCounterState()); } diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index 42daf03..112e2ff 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -24,7 +24,6 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener private final Thread workerThread; private final Runnable closeCallback; - private final CounterState counterState = new CounterState(); private final String stateTopic; private final Producer producer; @@ -32,6 +31,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener private final Phaser phaser = new Phaser(1); private final Set assignedPartitions = new HashSet<>(); private volatile PartitionState[] partitionStates; + private CounterState[] counterState; private volatile long[] stateEndOffsets; private volatile int[] seen; private volatile int[] acked; @@ -73,6 +73,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener { partitionStates[i] = PartitionState.UNASSIGNED; } + counterState = new CounterState[numPartitions]; stateEndOffsets = new long[numPartitions]; seen = new int[numPartitions]; acked = new int[numPartitions]; @@ -174,7 +175,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener String key, String value) { - counterState.setCounterState(key, Long.parseLong(value)); + counterState[partition].setCounterState(key, Long.parseLong(value)); if (offset + 1 == stateEndOffsets[partition]) { log.info("{} - Restoring of state for partition {} done!", id, partition); @@ -196,19 +197,21 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener Integer partition, String key) { - Long counter = computeCount(key); + Long counter = computeCount(partition, key); log.info("{} - current value for counter {}: {}", id, key, counter); sendCounterState(partition, key, counter); } - private synchronized Long computeCount(String key) + private synchronized Long computeCount(int partition, String key) { - return counterState.addToCounter(key); + return counterState[partition].addToCounter(key); } - public Map getCounterState() + public Map> getCounterState() { - return counterState.getCounterState(); + Map> result = new HashMap<>(assignedPartitions.size()); + assignedPartitions.forEach(tp -> result.put(tp.partition(), counterState[tp.partition()].getCounterState())); + return result; } void sendCounterState(int partition, String key, Long counter) @@ -308,6 +311,8 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener private void restoreAndAssign(int partition) { + counterState[partition] = new CounterState(); + TopicPartition statePartition = new TopicPartition(this.stateTopic, partition); long stateEndOffset = consumer @@ -416,6 +421,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener TopicPartition messagePartition = new TopicPartition(topic, partition); log.info("{} - Revoking partition {}", id, messagePartition); assignedPartitions.remove(messagePartition); + counterState[partition] = null; phaser.arriveAndDeregister(); log.info( -- 2.20.1