From: Kai Moritz Date: Mon, 17 Mar 2025 15:29:28 +0000 (+0100) Subject: Vorlage X-Git-Tag: spring/spring-consumer--vorlage--2025-03-18--19-42 X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=3ac9f183d61b5b32fcdcab5f29aa033ad916270e;p=demos%2Fkafka%2Ftraining Vorlage --- diff --git a/README.sh b/README.sh deleted file mode 100755 index b46e2350..00000000 --- a/README.sh +++ /dev/null @@ -1,39 +0,0 @@ -#!/bin/bash - -IMAGE=juplo/spring-consumer:1.1-SNAPSHOT - -if [ "$1" = "cleanup" ] -then - docker compose -f docker/docker-compose.yml down -t0 -v --remove-orphans - mvn clean - exit -fi - -docker compose -f docker/docker-compose.yml up -d --remove-orphans kafka-1 kafka-2 kafka-3 -docker compose -f docker/docker-compose.yml rm -svf consumer - -if [[ - $(docker image ls -q $IMAGE) == "" || - "$1" = "build" -]] -then - mvn clean install || exit -else - echo "Using image existing images:" - docker image ls $IMAGE -fi - -docker compose -f docker/docker-compose.yml up --remove-orphans setup || exit 1 - - -docker compose -f docker/docker-compose.yml up -d producer -docker compose -f docker/docker-compose.yml up -d consumer - -sleep 5 -docker compose -f docker/docker-compose.yml stop consumer - -docker compose -f docker/docker-compose.yml start consumer -sleep 5 - -docker compose -f docker/docker-compose.yml stop producer consumer -docker compose -f docker/docker-compose.yml logs consumer diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index 1f5a5706..303e30ee 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -45,22 +45,11 @@ public class ExampleConsumer implements Runnable try { log.info("{} - Subscribing to topic {}", id, topic); - consumer.subscribe(Arrays.asList(topic)); + // TODO: Aufruf von consumer.subscribe() while (true) { - ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); - - log.info("{} - Received {} messages", id, records.count()); - for (ConsumerRecord record : records) - { - handleRecord( - record.topic(), - record.partition(), - record.offset(), - record.key(), - record.value()); - } + // TODO: Poll & Consume aus Ihrer bisherigen Implementierung } } catch(WakeupException e) @@ -82,17 +71,6 @@ public class ExampleConsumer implements Runnable } } - private void handleRecord( - String topic, - Integer partition, - Long offset, - String key, - String value) - { - consumed++; - log.info("{} - partition={}-{}, offset={}: {}={}", id, topic, partition, offset, key, value); - } - public void shutdown() throws InterruptedException {