From: Kai Moritz Date: Sat, 23 Jul 2022 13:35:01 +0000 (+0200) Subject: Merge der überarbeiteten Compose-Konfiguration ('counting-consumer') X-Git-Tag: sumup-requests---lvm-2-tage~5^2^2~1 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=66863e3169440f73ff450c7ba8ea4b9662b180e0;hp=f9c0ba7779552d8fcfc9cb29c8b689e20c314904;p=demos%2Fkafka%2Ftraining Merge der überarbeiteten Compose-Konfiguration ('counting-consumer') --- diff --git a/README.sh b/README.sh index c14f45b..13176d2 100755 --- a/README.sh +++ b/README.sh @@ -47,6 +47,7 @@ http -v :8081/seen sleep 1 http -v :8081/seen +docker-compose stop producer docker-compose exec -T cli bash << 'EOF' echo "Altering number of partitions from 3 to 7..." # tag::altertopic[] @@ -55,7 +56,7 @@ kafka-topics --bootstrap-server kafka:9092 --describe --topic test # end::altertopic[] EOF -docker-compose restart producer +docker-compose start producer sleep 1 http -v :8081/seen sleep 1 diff --git a/docker-compose.yml b/docker-compose.yml index f9eeedd..1b067cd 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -39,7 +39,7 @@ services: producer: image: juplo/endless-producer:1.0-SNAPSHOT ports: - - 8000:8080 + - 8080:8080 environment: server.port: 8080 producer.bootstrap-server: kafka:9092 diff --git a/src/main/java/de/juplo/kafka/DriverController.java b/src/main/java/de/juplo/kafka/DriverController.java index 06e562c..1fb2a1b 100644 --- a/src/main/java/de/juplo/kafka/DriverController.java +++ b/src/main/java/de/juplo/kafka/DriverController.java @@ -1,9 +1,9 @@ package de.juplo.kafka; import lombok.RequiredArgsConstructor; -import org.springframework.web.bind.annotation.GetMapping; import org.springframework.http.HttpStatus; import org.springframework.web.bind.annotation.ExceptionHandler; +import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.ResponseStatus; import org.springframework.web.bind.annotation.RestController; diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index 2310ccd..c2d4447 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -1,19 +1,17 @@ package de.juplo.kafka; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.serialization.StringDeserializer; import javax.annotation.PreDestroy; import java.time.Duration; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; -import java.util.Optional; -import java.util.Properties; +import java.util.*; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.locks.Condition; @@ -39,7 +37,7 @@ public class EndlessConsumer implements Runnable private KafkaConsumer consumer = null; - private Map> seen; + private final Map> seen = new HashMap<>(); public EndlessConsumer( @@ -75,9 +73,37 @@ public class EndlessConsumer implements Runnable this.consumer = new KafkaConsumer<>(props); log.info("{} - Subscribing to topic {}", id, topic); - consumer.subscribe(Arrays.asList(topic)); + consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() + { + @Override + public void onPartitionsRevoked(Collection partitions) + { + partitions.forEach(tp -> + { + log.info("{} - removing partition: {}", id, tp); + Map removed = seen.remove(tp.partition()); + for (String key : removed.keySet()) + { + log.info( + "{} - Seen {} messages for partition={}|key={}", + id, + removed.get(key), + tp.partition(), + key); + } + }); + } - seen = new HashMap<>(); + @Override + public void onPartitionsAssigned(Collection partitions) + { + partitions.forEach(tp -> + { + log.info("{} - adding partition: {}", id, tp); + seen.put(tp.partition(), new HashMap<>()); + }); + } + }); while (true) { @@ -101,10 +127,6 @@ public class EndlessConsumer implements Runnable Integer partition = record.partition(); String key = record.key() == null ? "NULL" : record.key(); - - if (!seen.containsKey(partition)) - seen.put(partition, new HashMap<>()); - Map byKey = seen.get(partition); if (!byKey.containsKey(key)) @@ -130,31 +152,10 @@ public class EndlessConsumer implements Runnable { log.info("{} - Closing the KafkaConsumer", id); consumer.close(); - - for (Integer partition : seen.keySet()) - { - Map byKey = seen.get(partition); - for (String key : byKey.keySet()) - { - log.info( - "{} - Seen {} messages for partition={}|key={}", - id, - byKey.get(key), - partition, - key); - } - } - seen = null; - log.info("{} - Consumer-Thread exiting", id); } } - public Map> getSeen() - { - return seen; - } - private void shutdown() { shutdown(null); @@ -175,6 +176,11 @@ public class EndlessConsumer implements Runnable } } + public Map> getSeen() + { + return seen; + } + public void start() { lock.lock();