From: Kai Moritz Date: Fri, 16 Sep 2022 17:34:52 +0000 (+0200) Subject: WIP X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=48dcbfad9753e10e364fab39f95f70831b1c447c;p=demos%2Fkafka%2Ftraining WIP --- diff --git a/README.sh b/README.sh index a2d813d..bb8bc0a 100755 --- a/README.sh +++ b/README.sh @@ -19,7 +19,7 @@ if [[ ]] then docker-compose rm -svf adder-1 adder-2 - mvn -D skipTests clean install || exit + mvn clean install || exit else echo "Using image existing images:" docker image ls $IMAGE diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java index d913823..4e5457c 100644 --- a/src/main/java/de/juplo/kafka/Application.java +++ b/src/main/java/de/juplo/kafka/Application.java @@ -1,7 +1,6 @@ package de.juplo.kafka; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.boot.SpringApplication; @@ -11,10 +10,7 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.context.annotation.Bean; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.KafkaListenerEndpointRegistry; -import org.springframework.kafka.core.DefaultKafkaProducerFactory; -import org.springframework.kafka.core.KafkaOperations; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.core.*; import org.springframework.kafka.listener.DeadLetterPublishingRecoverer; import org.springframework.kafka.listener.DefaultErrorHandler; import org.springframework.kafka.support.serializer.DelegatingByTypeSerializer; @@ -54,14 +50,14 @@ public class Application ApplicationRecordHandler recordHandler, AdderResults adderResults, StateRepository stateRepository, - Consumer kafkaConsumer, + ConsumerFactory consumerFactory, KafkaProperties kafkaProperties) { return new ApplicationRebalanceListener( recordHandler, adderResults, stateRepository, - kafkaConsumer, + consumerFactory.createConsumer(), kafkaProperties.getConsumer().getGroupId()); } diff --git a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java index cd36959..bffc146 100644 --- a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java +++ b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java @@ -2,6 +2,7 @@ package de.juplo.kafka; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.common.TopicPartition; import org.springframework.kafka.listener.ConsumerAwareRebalanceListener; @@ -16,9 +17,8 @@ public class ApplicationRebalanceListener implements ConsumerAwareRebalanceListe private final ApplicationRecordHandler recordHandler; private final AdderResults adderResults; private final StateRepository stateRepository; - private final String id; - private final String topic; private final Consumer consumer; + private final String id; private final Set partitions = new HashSet<>();