WIP
authorKai Moritz <kai@juplo.de>
Fri, 16 Sep 2022 17:34:52 +0000 (19:34 +0200)
committerKai Moritz <kai@juplo.de>
Fri, 16 Sep 2022 17:34:52 +0000 (19:34 +0200)
README.sh
src/main/java/de/juplo/kafka/Application.java
src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java

index a2d813d..bb8bc0a 100755 (executable)
--- a/README.sh
+++ b/README.sh
@@ -19,7 +19,7 @@ if [[
 ]]
 then
   docker-compose rm -svf adder-1 adder-2
-  mvn -D skipTests clean install || exit
+  mvn clean install || exit
 else
   echo "Using image existing images:"
   docker image ls $IMAGE
index d913823..4e5457c 100644 (file)
@@ -1,7 +1,6 @@
 package de.juplo.kafka;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.springframework.boot.SpringApplication;
@@ -11,10 +10,7 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties
 import org.springframework.context.annotation.Bean;
 import org.springframework.kafka.annotation.EnableKafka;
 import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
-import org.springframework.kafka.core.DefaultKafkaProducerFactory;
-import org.springframework.kafka.core.KafkaOperations;
-import org.springframework.kafka.core.KafkaTemplate;
-import org.springframework.kafka.core.ProducerFactory;
+import org.springframework.kafka.core.*;
 import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
 import org.springframework.kafka.listener.DefaultErrorHandler;
 import org.springframework.kafka.support.serializer.DelegatingByTypeSerializer;
@@ -54,14 +50,14 @@ public class Application
       ApplicationRecordHandler recordHandler,
       AdderResults adderResults,
       StateRepository stateRepository,
-      Consumer<String, Message> kafkaConsumer,
+      ConsumerFactory<String, Message> consumerFactory,
       KafkaProperties kafkaProperties)
   {
     return new ApplicationRebalanceListener(
         recordHandler,
         adderResults,
         stateRepository,
-        kafkaConsumer,
+        consumerFactory.createConsumer(),
         kafkaProperties.getConsumer().getGroupId());
   }
 
index cd36959..bffc146 100644 (file)
@@ -2,6 +2,7 @@ package de.juplo.kafka;
 
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.common.TopicPartition;
 import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
@@ -16,9 +17,8 @@ public class ApplicationRebalanceListener implements ConsumerAwareRebalanceListe
   private final ApplicationRecordHandler recordHandler;
   private final AdderResults adderResults;
   private final StateRepository stateRepository;
-  private final String id;
-  private final String topic;
   private final Consumer consumer;
+  private final String id;
 
   private final Set<Integer> partitions = new HashSet<>();