From: Kai Moritz Date: Tue, 11 Mar 2025 17:55:21 +0000 (+0100) Subject: Vorlage (für das Bauen des Consumers als Docker-Image) X-Git-Tag: grundlagen/simple-consumer--image--vorlage--2026-06-lvm--rebase-vollständig X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=refs%2Fheads%2Fgrundlagen%2Fsimple-consumer--image--vorlage;p=demos%2Fkafka%2Ftraining Vorlage (für das Bauen des Consumers als Docker-Image) --- diff --git a/README.sh b/README.sh deleted file mode 100755 index 85b8f960..00000000 --- a/README.sh +++ /dev/null @@ -1,39 +0,0 @@ -#!/bin/bash - -IMAGE=juplo/simple-consumer:1.0-SNAPSHOT - -if [ "$1" = "cleanup" ] -then - docker compose -f docker/docker-compose.yml down -t0 -v --remove-orphans - mvn clean - exit -fi - -docker compose -f docker/docker-compose.yml up -d --remove-orphans kafka-1 kafka-2 kafka-3 -docker compose -f docker/docker-compose.yml rm -svf consumer - -if [[ - $(docker image ls -q $IMAGE) == "" || - "$1" = "build" -]] -then - mvn clean install || exit -else - echo "Using image existing images:" - docker image ls $IMAGE -fi - -docker compose -f docker/docker-compose.yml up --remove-orphans setup || exit 1 - - -docker compose -f docker/docker-compose.yml up -d producer -docker compose -f docker/docker-compose.yml up -d consumer - -sleep 5 -docker compose -f docker/docker-compose.yml stop consumer - -docker compose -f docker/docker-compose.yml start consumer -sleep 5 - -docker compose -f docker/docker-compose.yml stop producer consumer -docker compose -f docker/docker-compose.yml logs consumer diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index 36ddc70d..f63da160 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -1,23 +1,20 @@ package de.juplo.kafka; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.Arrays; -import java.util.Collection; import java.util.Properties; @Slf4j -public class ExampleConsumer implements ConsumerRebalanceListener +public class ExampleConsumer { private final String id; private final String topic; @@ -32,16 +29,16 @@ public class ExampleConsumer implements ConsumerRebalanceListener String groupId, String clientId) { - Properties props = new Properties(); - props.put("bootstrap.servers", broker); - props.put("group.id", groupId); // ID für die Offset-Commits - props.put("client.id", clientId); // Nur zur Wiedererkennung - props.put("key.deserializer", StringDeserializer.class.getName()); - props.put("value.deserializer", StringDeserializer.class.getName()); + // TODO: + // Instanziierung des KafkaConsumer aus Ihrer bisherigen Implementierung + // Hier: + // - Den Parameter "broker" des Konstruktors für "bootstrap.servers" verwenden + // - Den Parameter "groupId" des Konstruktors für "group.id" verwenden + // - Den Parameter "topic" des Konstruktors im Attribut "this.topic" merken + // - Den erzeugten KafkaConsumer in "this.consumer" ablegen this.id = clientId; this.topic = topic; - consumer = new KafkaConsumer<>(props); } @@ -50,78 +47,25 @@ public class ExampleConsumer implements ConsumerRebalanceListener try { log.info("{} - Subscribing to topic {}", id, topic); - consumer.subscribe(Arrays.asList(topic), this); + // TODO: Aufruf von consumer.subscribe() + // Hier: + // - ACHTUNG: Abonnieren Sie das Topic "this.topic" running = true; while (true) { - ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); - - log.info("{} - Received {} messages", id, records.count()); - for (ConsumerRecord record : records) - { - handleRecord( - record.topic(), - record.partition(), - record.offset(), - record.key(), - record.value()); - } + // TODO: Poll & Consume aus Ihrer bisherigen Implementierung + // Hier: + // - Zählen Sie außerdem die empfangenen Nachrichten in "this.consumed" } } - catch(WakeupException e) - { - log.info("{} - Consumer was signaled to finish its work", id); - } - catch(Exception e) - { - log.error("{} - Unexpected error, unsubscribing!", id, e); - consumer.unsubscribe(); - } finally { - log.info("{} - Closing the KafkaConsumer", id); - consumer.close(); log.info("{}: Consumed {} messages in total, exiting!", id, consumed); running = false; } } - private void handleRecord( - String topic, - Integer partition, - Long offset, - String key, - String value) - { - consumed++; - log.info("{} - partition={}-{}, offset={}: {}={}", id, topic, partition, offset, key, value); - } - - @Override - public void onPartitionsAssigned(Collection partitions) - { - partitions - .stream() - .forEach(partition -> log.info("{} - partition assigned: {}", id, partition)); - } - - @Override - public void onPartitionsRevoked(Collection partitions) - { - partitions - .stream() - .forEach(partition -> log.info("{} - partition revoked: {}", id, partition)); - } - - @Override - public void onPartitionsLost(Collection partitions) - { - partitions - .stream() - .forEach(partition -> log.info("{} - partition lost: {}", id, partition)); - } - public static void main(String[] args) throws Exception { @@ -148,7 +92,7 @@ public class ExampleConsumer implements ConsumerRebalanceListener Runtime.getRuntime().addShutdownHook(new Thread(() -> { - instance.consumer.wakeup(); + // TODO: Endlosschleife beenden while (instance.running) {