From: Kai Moritz Date: Sat, 30 Apr 2022 09:38:26 +0000 (+0200) Subject: Merge branch 'deserialization' into springified-consumer--serialization X-Git-Tag: sumup-adder---lvm-2-tage~9^2~7^2~5 X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=d2f482e5eb951e725f4193f97d979330e91c930e;hp=c4fd031abdae00bdbd934216579f0a38ddd46783;p=demos%2Fkafka%2Ftraining Merge branch 'deserialization' into springified-consumer--serialization --- diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java index 6601e6d..b5bd1b9 100644 --- a/src/main/java/de/juplo/kafka/Application.java +++ b/src/main/java/de/juplo/kafka/Application.java @@ -46,12 +46,12 @@ public class Application implements ApplicationRunner } finally { - if (!executor.isShutdown()) + if (!executor.isTerminated()) { log.warn("Forcing shutdown of ExecutorService!"); executor .shutdownNow() - .forEach(runnable -> log.warn("Unfinished task: {}", runnable.getClass().getSimpleName())); + .forEach(runnable -> log.warn("Unprocessed task: {}", runnable.getClass().getSimpleName())); } log.info("Shutdow of ExecutorService finished"); } diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index b173b12..8802df9 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -19,7 +19,7 @@ import java.util.concurrent.locks.ReentrantLock; @Slf4j @RequiredArgsConstructor -public class EndlessConsumer implements Runnable +public class EndlessConsumer implements ConsumerRebalanceListener, Runnable { private final ExecutorService executor; private final String id; @@ -37,55 +37,55 @@ public class EndlessConsumer implements Runnable private final Map offsets = new HashMap<>(); + @Override + public void onPartitionsRevoked(Collection partitions) + { + partitions.forEach(tp -> + { + Integer partition = tp.partition(); + Long newOffset = consumer.position(tp); + Long oldOffset = offsets.remove(partition); + log.info( + "{} - removing partition: {}, consumed {} records (offset {} -> {})", + id, + partition, + newOffset - oldOffset, + oldOffset, + newOffset); + Map removed = seen.remove(partition); + for (String key : removed.keySet()) + { + log.info( + "{} - Seen {} messages for partition={}|key={}", + id, + removed.get(key), + partition, + key); + } + }); + } + + @Override + public void onPartitionsAssigned(Collection partitions) + { + partitions.forEach(tp -> + { + Integer partition = tp.partition(); + Long offset = consumer.position(tp); + log.info("{} - adding partition: {}, offset={}", id, partition, offset); + offsets.put(partition, offset); + seen.put(partition, new HashMap<>()); + }); + } + + @Override public void run() { try { log.info("{} - Subscribing to topic {}", id, topic); - consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() - { - @Override - public void onPartitionsRevoked(Collection partitions) - { - partitions.forEach(tp -> - { - Integer partition = tp.partition(); - Long newOffset = consumer.position(tp); - Long oldOffset = offsets.remove(partition); - log.info( - "{} - removing partition: {}, consumed {} records (offset {} -> {})", - id, - partition, - newOffset - oldOffset, - oldOffset, - newOffset); - Map removed = seen.remove(partition); - for (String key : removed.keySet()) - { - log.info( - "{} - Seen {} messages for partition={}|key={}", - id, - removed.get(key), - partition, - key); - } - }); - } - - @Override - public void onPartitionsAssigned(Collection partitions) - { - partitions.forEach(tp -> - { - Integer partition = tp.partition(); - Long offset = consumer.position(tp); - log.info("{} - adding partition: {}, offset={}", id, partition, offset); - offsets.put(partition, offset); - seen.put(partition, new HashMap<>()); - }); - } - }); + consumer.subscribe(Arrays.asList(topic), this); while (true) {