From f20231dcb36eaeee69e07192b5785bc59abc1dc1 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Tue, 5 Apr 2022 22:37:55 +0200 Subject: [PATCH] =?utf8?q?Report=20=C3=BCber=20gesehene=20Schl=C3=BCssel?= =?utf8?q?=20wiederbelebt?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * An der alten Stelle war die Map ja jetzt nur noch leer, da dem Consumer zu dem Zeitpunkt, an dem die gesehen Schlüssel ausgegeben wurden, bereits alle Partitionen entzogen worden sind. * Daher werden die gesehenen Schlüssel jetzt ausgegeben, wenn eine Partition entzogen wird. --- .../java/de/juplo/kafka/EndlessConsumer.java | 36 ++++++++++--------- 1 file changed, 19 insertions(+), 17 deletions(-) diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index 9aa8152..be371ae 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -74,13 +74,30 @@ public class EndlessConsumer implements Runnable @Override public void onPartitionsRevoked(Collection partitions) { - partitions.forEach(tp -> seen.remove(tp.partition())); + partitions.forEach(tp -> + { + log.info("{} - removing partition: {}", id, tp); + Map removed = seen.remove(tp.partition()); + for (String key : removed.keySet()) + { + log.info( + "{} - Seen {} messages for partition={}|key={}", + id, + removed.get(key), + removed, + key); + } + }); } @Override public void onPartitionsAssigned(Collection partitions) { - partitions.forEach(tp -> seen.put(tp.partition(), new HashMap<>())); + partitions.forEach(tp -> + { + log.info("{} - adding partition: {}", id, tp); + seen.put(tp.partition(), new HashMap<>()); + }); } }); @@ -130,21 +147,6 @@ public class EndlessConsumer implements Runnable { log.info("{} - Closing the KafkaConsumer", id); consumer.close(); - - for (Integer partition : seen.keySet()) - { - Map byKey = seen.get(partition); - for (String key : byKey.keySet()) - { - log.info( - "{} - Seen {} messages for partition={}|key={}", - id, - byKey.get(key), - partition, - key); - } - } - log.info("{} - Consumer-Thread exiting", id); } } -- 2.20.1