Verbesserungen aus 'stored-state' nach 'rebalance-listener' gemerged
authorKai Moritz <kai@juplo.de>
Sat, 13 Aug 2022 08:48:49 +0000 (10:48 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 13 Aug 2022 09:15:22 +0000 (11:15 +0200)
14 files changed:
README.sh
docker-compose.yml
pom.xml
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/EndlessConsumer.java
src/main/java/de/juplo/kafka/KeyCountingRebalanceListener.java
src/main/java/de/juplo/kafka/PartitionStatisticsRepository.java [deleted file]
src/main/java/de/juplo/kafka/PollIntervalAwareConsumerRebalanceListener.java [deleted file]
src/main/java/de/juplo/kafka/RecordHandler.java
src/main/java/de/juplo/kafka/StatisticsDocument.java [deleted file]
src/main/resources/application.yml
src/test/java/de/juplo/kafka/ApplicationIT.java
src/test/java/de/juplo/kafka/ApplicationTests.java
src/test/java/de/juplo/kafka/TestRecordHandler.java

index 39e9300..13176d2 100755 (executable)
--- a/README.sh
+++ b/README.sh
@@ -9,7 +9,7 @@ then
   exit
 fi
 
-docker-compose up -d zookeeper kafka cli mongo express
+docker-compose up -d zookeeper kafka cli
 
 if [[
   $(docker image ls -q $IMAGE) == "" ||
index 7bcf68c..df6b321 100644 (file)
@@ -24,25 +24,6 @@ services:
     depends_on:
       - zookeeper
 
-  mongo:
-    image: mongo:4.4.13
-    ports:
-      - 27017:27017
-    environment:
-      MONGO_INITDB_ROOT_USERNAME: juplo
-      MONGO_INITDB_ROOT_PASSWORD: training
-
-  express:
-    image: mongo-express
-    ports:
-      - 8090:8081
-    environment:
-      ME_CONFIG_MONGODB_ADMINUSERNAME: juplo
-      ME_CONFIG_MONGODB_ADMINPASSWORD: training
-      ME_CONFIG_MONGODB_URL: mongodb://juplo:training@mongo:27017/
-    depends_on:
-      - mongo
-
   kafka-ui:
     image: provectuslabs/kafka-ui:0.3.3
     ports:
@@ -74,5 +55,3 @@ services:
       server.port: 8080
       consumer.bootstrap-server: kafka:9092
       consumer.client-id: consumer
-      spring.data.mongodb.uri: mongodb://juplo:training@mongo:27017
-      spring.data.mongodb.database: juplo
diff --git a/pom.xml b/pom.xml
index 701704d..1f5caab 100644 (file)
--- a/pom.xml
+++ b/pom.xml
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-web</artifactId>
     </dependency>
-    <dependency>
-      <groupId>org.springframework.boot</groupId>
-      <artifactId>spring-boot-starter-data-mongodb</artifactId>
-    </dependency>
     <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-validation</artifactId>
       <artifactId>awaitility</artifactId>
       <scope>test</scope>
     </dependency>
-    <dependency>
-      <groupId>de.flapdoodle.embed</groupId>
-      <artifactId>de.flapdoodle.embed.mongo</artifactId>
-      <scope>test</scope>
-    </dependency>
   </dependencies>
 
   <build>
index 1ea90a2..7a0a8ad 100644 (file)
@@ -7,7 +7,6 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
-import java.time.Clock;
 import java.util.Properties;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -26,15 +25,11 @@ public class ApplicationConfiguration
   @Bean
   public KeyCountingRebalanceListener keyCountingRebalanceListener(
       KeyCountingRecordHandler keyCountingRecordHandler,
-      PartitionStatisticsRepository repository,
       ApplicationProperties properties)
   {
     return new KeyCountingRebalanceListener(
         keyCountingRecordHandler,
-        repository,
-        properties.getClientId(),
-        Clock.systemDefaultZone(),
-        properties.getCommitInterval());
+        properties.getClientId());
   }
 
   @Bean
index 047d5cb..c7579b8 100644 (file)
@@ -25,7 +25,7 @@ public class EndlessConsumer<K, V> implements Runnable
   private final String id;
   private final String topic;
   private final Consumer<K, V> consumer;
-  private final PollIntervalAwareConsumerRebalanceListener pollIntervalAwareRebalanceListener;
+  private final ConsumerRebalanceListener consumerRebalanceListener;
   private final RecordHandler<K, V> handler;
 
   private final Lock lock = new ReentrantLock();
@@ -42,7 +42,7 @@ public class EndlessConsumer<K, V> implements Runnable
     try
     {
       log.info("{} - Subscribing to topic {}", id, topic);
-      consumer.subscribe(Arrays.asList(topic), pollIntervalAwareRebalanceListener);
+      consumer.subscribe(Arrays.asList(topic), consumerRebalanceListener);
 
       while (true)
       {
@@ -67,8 +67,6 @@ public class EndlessConsumer<K, V> implements Runnable
 
           consumed++;
         }
-
-        pollIntervalAwareRebalanceListener.beforeNextPoll();
       }
     }
     catch(WakeupException e)
index 636ff86..0ad1f31 100644 (file)
@@ -2,26 +2,20 @@ package de.juplo.kafka;
 
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.common.TopicPartition;
 
-import java.time.Clock;
-import java.time.Duration;
-import java.time.Instant;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.Map;
 
 
 @RequiredArgsConstructor
 @Slf4j
-public class KeyCountingRebalanceListener implements PollIntervalAwareConsumerRebalanceListener
+public class KeyCountingRebalanceListener implements ConsumerRebalanceListener
 {
   private final KeyCountingRecordHandler handler;
-  private final PartitionStatisticsRepository repository;
   private final String id;
-  private final Clock clock;
-  private final Duration commitInterval;
-
-  private Instant lastCommit = Instant.EPOCH;
 
   @Override
   public void onPartitionsAssigned(Collection<TopicPartition> partitions)
@@ -30,11 +24,7 @@ public class KeyCountingRebalanceListener implements PollIntervalAwareConsumerRe
     {
       Integer partition = tp.partition();
       log.info("{} - adding partition: {}", id, partition);
-      StatisticsDocument document =
-          repository
-              .findById(Integer.toString(partition))
-              .orElse(new StatisticsDocument(partition));
-      handler.addPartition(partition, document.statistics);
+      handler.addPartition(partition, new HashMap<>());
     });
   }
 
@@ -55,22 +45,6 @@ public class KeyCountingRebalanceListener implements PollIntervalAwareConsumerRe
             partition,
             key);
       }
-      repository.save(new StatisticsDocument(partition, removed));
     });
   }
-
-
-  @Override
-  public void beforeNextPoll()
-  {
-    if (lastCommit.plus(commitInterval).isBefore(clock.instant()))
-    {
-      log.debug("Storing data, last commit: {}", lastCommit);
-      handler.getSeen().forEach((partiton, statistics) -> repository.save(
-          new StatisticsDocument(
-              partiton,
-              statistics)));
-      lastCommit = clock.instant();
-    }
-  }
 }
diff --git a/src/main/java/de/juplo/kafka/PartitionStatisticsRepository.java b/src/main/java/de/juplo/kafka/PartitionStatisticsRepository.java
deleted file mode 100644 (file)
index 0ccf3cd..0000000
+++ /dev/null
@@ -1,11 +0,0 @@
-package de.juplo.kafka;
-
-import org.springframework.data.mongodb.repository.MongoRepository;
-
-import java.util.Optional;
-
-
-public interface PartitionStatisticsRepository extends MongoRepository<StatisticsDocument, String>
-{
-  public Optional<StatisticsDocument> findById(String partition);
-}
diff --git a/src/main/java/de/juplo/kafka/PollIntervalAwareConsumerRebalanceListener.java b/src/main/java/de/juplo/kafka/PollIntervalAwareConsumerRebalanceListener.java
deleted file mode 100644 (file)
index 8abec12..0000000
+++ /dev/null
@@ -1,9 +0,0 @@
-package de.juplo.kafka;
-
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-
-
-public interface PollIntervalAwareConsumerRebalanceListener extends ConsumerRebalanceListener
-{
-  default void beforeNextPoll() {}
-}
index 3c9dd15..327ac9f 100644 (file)
@@ -7,5 +7,4 @@ import java.util.function.Consumer;
 
 public interface RecordHandler<K, V> extends Consumer<ConsumerRecord<K,V>>
 {
-  default void beforeNextPoll() {}
 }
diff --git a/src/main/java/de/juplo/kafka/StatisticsDocument.java b/src/main/java/de/juplo/kafka/StatisticsDocument.java
deleted file mode 100644 (file)
index 415ef5c..0000000
+++ /dev/null
@@ -1,34 +0,0 @@
-package de.juplo.kafka;
-
-import lombok.ToString;
-import org.springframework.data.annotation.Id;
-import org.springframework.data.mongodb.core.mapping.Document;
-
-import java.util.HashMap;
-import java.util.Map;
-
-
-@Document(collection = "statistics")
-@ToString
-public class StatisticsDocument
-{
-  @Id
-  public String id;
-  public Map<String, Long> statistics;
-
-  public StatisticsDocument()
-  {
-  }
-
-  public StatisticsDocument(Integer partition)
-  {
-    this.id = Integer.toString(partition);
-    this.statistics = new HashMap<>();
-  }
-
-  public StatisticsDocument(Integer partition, Map<String, Long> statistics)
-  {
-    this.id = Integer.toString(partition);
-    this.statistics = statistics;
-  }
-}
index fc1c68a..f8bfe7e 100644 (file)
@@ -25,11 +25,6 @@ info:
     group-id: ${consumer.group-id}
     topic: ${consumer.topic}
     auto-offset-reset: ${consumer.auto-offset-reset}
-spring:
-  data:
-    mongodb:
-      uri: mongodb://juplo:training@localhost:27017
-      database: juplo
 logging:
   level:
     root: INFO
index d1d8e50..2e6ac7d 100644 (file)
@@ -2,7 +2,6 @@ package de.juplo.kafka;
 
 import org.junit.jupiter.api.Test;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo;
 import org.springframework.boot.test.context.SpringBootTest;
 import org.springframework.boot.test.web.client.TestRestTemplate;
 import org.springframework.boot.test.web.server.LocalServerPort;
@@ -18,7 +17,6 @@ import static de.juplo.kafka.ApplicationTests.TOPIC;
         "consumer.topic=" + TOPIC,
         "spring.mongodb.embedded.version=4.4.13" })
 @EmbeddedKafka(topics = TOPIC)
-@AutoConfigureDataMongo
 public class ApplicationIT
 {
   public static final String TOPIC = "FOO";
index 7f666f6..5b13b7d 100644 (file)
@@ -12,7 +12,6 @@ import org.apache.kafka.common.utils.Bytes;
 import org.junit.jupiter.api.*;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
-import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo;
 import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer;
 import org.springframework.boot.test.context.TestConfiguration;
 import org.springframework.context.annotation.Bean;
@@ -41,11 +40,9 @@ import static org.awaitility.Awaitility.*;
                properties = {
                                "consumer.bootstrap-server=${spring.embedded.kafka.brokers}",
                                "consumer.topic=" + TOPIC,
-                               "consumer.commit-interval=1s",
-                               "spring.mongodb.embedded.version=4.4.13" })
+                               "consumer.commit-interval=1s" })
 @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)
 @EnableAutoConfiguration
-@AutoConfigureDataMongo
 @Slf4j
 class ApplicationTests
 {
index de28385..b4efdd6 100644 (file)
@@ -19,10 +19,4 @@ public abstract class TestRecordHandler<K, V> implements RecordHandler<K, V>
     this.onNewRecord(record);
     handler.accept(record);
   }
-  @Override
-
-  public void beforeNextPoll()
-  {
-    handler.beforeNextPoll();
-  }
 }