From: Kai Moritz Date: Fri, 2 Sep 2022 03:22:50 +0000 (+0200) Subject: Funktionsunabhängiger Name für das erweiterte Interface RebalanceListener X-Git-Tag: sumup-adder--vorlage---lvm-2-tage~12 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=refs%2Fheads%2Fsumup-adder--ohne--stored-offsets;p=demos%2Fkafka%2Ftraining Funktionsunabhängiger Name für das erweiterte Interface RebalanceListener --- diff --git a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java index d319295..63d57df 100644 --- a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java +++ b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java @@ -10,7 +10,7 @@ import java.util.*; @RequiredArgsConstructor @Slf4j -public class ApplicationRebalanceListener implements CommittingConsumerRebalanceListener +public class ApplicationRebalanceListener implements RebalanceListener { private final ApplicationRecordHandler recordHandler; private final AdderResults adderResults; diff --git a/src/main/java/de/juplo/kafka/CommittingConsumerRebalanceListener.java b/src/main/java/de/juplo/kafka/CommittingConsumerRebalanceListener.java deleted file mode 100644 index 8aa92c0..0000000 --- a/src/main/java/de/juplo/kafka/CommittingConsumerRebalanceListener.java +++ /dev/null @@ -1,10 +0,0 @@ -package de.juplo.kafka; - -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; - - -public interface CommittingConsumerRebalanceListener extends ConsumerRebalanceListener -{ - void enableCommits(); - void disableCommits(); -} diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index 63fc10f..3ff479c 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -25,7 +25,7 @@ public class EndlessConsumer implements Runnable private final String id; private final String topic; private final Consumer consumer; - private final CommittingConsumerRebalanceListener rebalanceListener; + private final RebalanceListener rebalanceListener; private final RecordHandler recordHandler; private final Lock lock = new ReentrantLock(); diff --git a/src/main/java/de/juplo/kafka/RebalanceListener.java b/src/main/java/de/juplo/kafka/RebalanceListener.java new file mode 100644 index 0000000..26f97aa --- /dev/null +++ b/src/main/java/de/juplo/kafka/RebalanceListener.java @@ -0,0 +1,10 @@ +package de.juplo.kafka; + +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; + + +public interface RebalanceListener extends ConsumerRebalanceListener +{ + void enableCommits(); + void disableCommits(); +} diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java index 7335770..8124c81 100644 --- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java +++ b/src/test/java/de/juplo/kafka/GenericApplicationTests.java @@ -66,7 +66,7 @@ abstract class GenericApplicationTests @Autowired MongoProperties mongoProperties; @Autowired - CommittingConsumerRebalanceListener rebalanceListener; + RebalanceListener rebalanceListener; @Autowired RecordHandler recordHandler;