From 6c23b556b03d674344d1063d3fd2746d2d6f80d5 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 9 Nov 2024 18:00:32 +0100 Subject: [PATCH] =?utf8?q?Vorlage=20f=C3=BCr=20die=20Rebalance-=C3=9Cbung?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- README.sh | 39 ------------------- .../java/de/juplo/kafka/ExampleConsumer.java | 32 +-------------- 2 files changed, 2 insertions(+), 69 deletions(-) delete mode 100755 README.sh diff --git a/README.sh b/README.sh deleted file mode 100755 index bdefd2b..0000000 --- a/README.sh +++ /dev/null @@ -1,39 +0,0 @@ -#!/bin/bash - -IMAGE=juplo/spring-consumer:1.1-rebalance-listener-SNAPSHOT - -if [ "$1" = "cleanup" ] -then - docker compose -f docker/docker-compose.yml down -t0 -v --remove-orphans - mvn clean - exit -fi - -docker compose -f docker/docker-compose.yml up -d --remove-orphans kafka-1 kafka-2 kafka-3 -docker compose -f docker/docker-compose.yml rm -svf consumer-1 consumer-2 - -if [[ - $(docker image ls -q $IMAGE) == "" || - "$1" = "build" -]] -then - mvn clean install || exit -else - echo "Using image existing images:" - docker image ls $IMAGE -fi - -docker compose -f docker/docker-compose.yml up --remove-orphans setup || exit 1 - - -docker compose -f docker/docker-compose.yml up -d producer -docker compose -f docker/docker-compose.yml up -d consumer-1 -sleep 10 -docker compose -f docker/docker-compose.yml exec cli http -v consumer-1:8881/ - -docker compose -f docker/docker-compose.yml up -d consumer-2 -sleep 10 -docker compose -f docker/docker-compose.yml exec cli http -v consumer-1:8881/ -docker compose -f docker/docker-compose.yml exec cli http -v consumer-2:8881/ - -docker compose -f docker/docker-compose.yml stop producer consumer-1 diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index 53abd4d..7cdf210 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -2,7 +2,6 @@ package de.juplo.kafka; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.common.TopicPartition; @@ -13,7 +12,7 @@ import java.util.*; @Slf4j -public class ExampleConsumer implements Runnable, ConsumerRebalanceListener +public class ExampleConsumer implements Runnable { private final String id; private final String topic; @@ -55,7 +54,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener counterState = new CounterState[numPartitions]; log.info("{} - Subscribing to topic {}", id, topic); - consumer.subscribe(Arrays.asList(topic), this); + consumer.subscribe(Arrays.asList(topic)); running = true; while (running) @@ -121,33 +120,6 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener } - @Override - public void onPartitionsAssigned(Collection partitions) - { - partitions - .stream() - .filter(partition -> partition.topic().equals(topic)) - .forEach(partition -> - { - assignedPartitions.add(partition); - counterState[partition.partition()] = new CounterState(new HashMap<>()); - }); - } - - @Override - public synchronized void onPartitionsRevoked(Collection partitions) - { - partitions - .stream() - .filter(partition -> partition.topic().equals(topic)) - .forEach(partition -> - { - assignedPartitions.remove(partition); - counterState[partition.partition()] = null; - }); - } - - public void shutdown() throws InterruptedException { log.info("{} joining the worker-thread...", id); -- 2.20.1