Springify: Kernfunktion von EndlessConsumer über Spring-Kafka
authorKai Moritz <kai@juplo.de>
Tue, 12 Apr 2022 22:38:24 +0000 (00:38 +0200)
committerKai Moritz <kai@juplo.de>
Fri, 15 Apr 2022 08:29:52 +0000 (10:29 +0200)
* Alle weiteren Funktionen für dieses erste Experiment erst mal entfernt
* Testfall entsprechend angepasst
* Der Commit passiert hier, weil Spring Kafka per Default den eignen
  Commit-Modus `BATCH` aktiviert, der nach jedem abgearbeiteten `poll()`
  einen (synchronen!) Commit durchführt
* Der Test ist zwar grün, wenn man die App normal startet, verfängt sie
  sich jedoch in einer Endlosschleife, da kein Error-Handler konfiguriert
  ist, der die `RecordDeserializationException` korrekt behandeln kann, so
  dass sich die App in einem Life-Deadlock befindet, in dem sie immer
  wieder den Datensatz erhält, der den Fehler ausgelöst hat, dadurch
  aber nicht wie ein Vanilla-Consumer beendet wird, da Spring Kafka die
  Exception abfängt und weitermacht: neuer `poll()` für die selbe Position,
  so dass die App aus Sicht des GroupCoordinator noch lebt.
* DEBUG-Logging für `org.springframework.kafka` aktiviert, damit man die
  Commits sieht.

pom.xml
src/main/java/de/juplo/kafka/Application.java
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java
src/main/java/de/juplo/kafka/DriverController.java
src/main/java/de/juplo/kafka/EndlessConsumer.java
src/main/resources/application.yml
src/main/resources/logback.xml
src/test/java/de/juplo/kafka/ApplicationTests.java

diff --git a/pom.xml b/pom.xml
index f218085..21466ec 100644 (file)
--- a/pom.xml
+++ b/pom.xml
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka-clients</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.springframework.kafka</groupId>
+      <artifactId>spring-kafka</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.projectlombok</groupId>
       <artifactId>lombok</artifactId>
index 6601e6d..76ba717 100644 (file)
@@ -1,63 +1,14 @@
 package de.juplo.kafka;
 
 import lombok.extern.slf4j.Slf4j;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.ApplicationArguments;
-import org.springframework.boot.ApplicationRunner;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 
-import javax.annotation.PreDestroy;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-
 
 @SpringBootApplication
 @Slf4j
-public class Application implements ApplicationRunner
+public class Application
 {
-  @Autowired
-  EndlessConsumer endlessConsumer;
-  @Autowired
-  ExecutorService executor;
-
-
-  @Override
-  public void run(ApplicationArguments args) throws Exception
-  {
-    log.info("Starting EndlessConsumer");
-    endlessConsumer.start();
-  }
-
-  @PreDestroy
-  public void stopExecutor()
-  {
-    try
-    {
-      log.info("Shutting down the ExecutorService.");
-      executor.shutdown();
-      log.info("Waiting 5 seconds for the ExecutorService to terminate...");
-      executor.awaitTermination(5, TimeUnit.SECONDS);
-    }
-    catch (InterruptedException e)
-    {
-      log.error("Exception while waiting for the termination of the ExecutorService: {}", e.toString());
-    }
-    finally
-    {
-      if (!executor.isShutdown())
-      {
-        log.warn("Forcing shutdown of ExecutorService!");
-        executor
-            .shutdownNow()
-            .forEach(runnable -> log.warn("Unfinished task: {}", runnable.getClass().getSimpleName()));
-      }
-      log.info("Shutdow of ExecutorService finished");
-    }
-  }
-
-
   public static void main(String[] args)
   {
     SpringApplication.run(Application.class, args);
index 4054e93..5cefa32 100644 (file)
@@ -1,16 +1,10 @@
 package de.juplo.kafka;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.serialization.LongDeserializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
-import java.util.Properties;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.function.Consumer;
 
 
@@ -26,42 +20,4 @@ public class ApplicationConfiguration
       // Handle record
     };
   }
-
-  @Bean
-  public EndlessConsumer<String, Long> endlessConsumer(
-      KafkaConsumer<String, Long> kafkaConsumer,
-      ExecutorService executor,
-      Consumer<ConsumerRecord<String, Long>> handler,
-      ApplicationProperties properties)
-  {
-    return
-        new EndlessConsumer<>(
-            executor,
-            properties.getClientId(),
-            properties.getTopic(),
-            kafkaConsumer,
-            handler);
-  }
-
-  @Bean
-  public ExecutorService executor()
-  {
-    return Executors.newSingleThreadExecutor();
-  }
-
-  @Bean(destroyMethod = "close")
-  public KafkaConsumer<String, Long> kafkaConsumer(ApplicationProperties properties)
-  {
-    Properties props = new Properties();
-
-    props.put("bootstrap.servers", properties.getBootstrapServer());
-    props.put("group.id", properties.getGroupId());
-    props.put("client.id", properties.getClientId());
-    props.put("auto.offset.reset", properties.getAutoOffsetReset());
-    props.put("metadata.max.age.ms", "1000");
-    props.put("key.deserializer", StringDeserializer.class.getName());
-    props.put("value.deserializer", LongDeserializer.class.getName());
-
-    return new KafkaConsumer<>(props);
-  }
 }
index dc3a26e..0244a05 100644 (file)
@@ -10,19 +10,12 @@ import org.springframework.stereotype.Component;
 @RequiredArgsConstructor
 public class ApplicationHealthIndicator implements HealthIndicator
 {
-  private final EndlessConsumer<String, Long> consumer;
-
-
   @Override
   public Health health()
   {
     try
     {
-      return consumer
-          .exitStatus()
-          .map(Health::down)
-          .orElse(Health.outOfService())
-          .build();
+      return Health.up().build();
     }
     catch (IllegalStateException e)
     {
index 93580ee..8ca3e2a 100644 (file)
@@ -8,6 +8,7 @@ import org.springframework.web.bind.annotation.PostMapping;
 import org.springframework.web.bind.annotation.ResponseStatus;
 import org.springframework.web.bind.annotation.RestController;
 
+import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
 
@@ -16,25 +17,20 @@ import java.util.concurrent.ExecutionException;
 @RequiredArgsConstructor
 public class DriverController
 {
-  private final EndlessConsumer consumer;
-
-
   @PostMapping("start")
   public void start()
   {
-    consumer.start();
   }
 
   @PostMapping("stop")
   public void stop() throws ExecutionException, InterruptedException
   {
-    consumer.stop();
   }
 
   @GetMapping("seen")
   public Map<Integer, Map<String, Long>> seen()
   {
-    return consumer.getSeen();
+    return new HashMap<>();
   }
 
   @ExceptionHandler
index b173b12..87780b4 100644 (file)
@@ -2,285 +2,39 @@ package de.juplo.kafka;
 
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.*;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.RecordDeserializationException;
-import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.stereotype.Component;
 
-import javax.annotation.PreDestroy;
-import java.time.Duration;
-import java.util.*;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
 
 
+@Component
 @Slf4j
 @RequiredArgsConstructor
-public class EndlessConsumer<K, V> implements Runnable
+public class EndlessConsumer<K, V>
 {
-  private final ExecutorService executor;
-  private final String id;
-  private final String topic;
-  private final Consumer<K, V> consumer;
-  private final java.util.function.Consumer<ConsumerRecord<K, V>> handler;
-
-  private final Lock lock = new ReentrantLock();
-  private final Condition condition = lock.newCondition();
-  private boolean running = false;
-  private Exception exception;
-  private long consumed = 0;
-
-  private final Map<Integer, Map<String, Long>> seen = new HashMap<>();
-  private final Map<Integer, Long> offsets = new HashMap<>();
-
-
-  @Override
-  public void run()
-  {
-    try
-    {
-      log.info("{} - Subscribing to topic {}", id, topic);
-      consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener()
-      {
-        @Override
-        public void onPartitionsRevoked(Collection<TopicPartition> partitions)
-        {
-          partitions.forEach(tp ->
-          {
-            Integer partition = tp.partition();
-            Long newOffset = consumer.position(tp);
-            Long oldOffset = offsets.remove(partition);
-            log.info(
-                "{} - removing partition: {}, consumed {} records (offset {} -> {})",
-                id,
-                partition,
-                newOffset - oldOffset,
-                oldOffset,
-                newOffset);
-            Map<String, Long> removed = seen.remove(partition);
-            for (String key : removed.keySet())
-            {
-              log.info(
-                  "{} - Seen {} messages for partition={}|key={}",
-                  id,
-                  removed.get(key),
-                  partition,
-                  key);
-            }
-          });
-        }
-
-        @Override
-        public void onPartitionsAssigned(Collection<TopicPartition> partitions)
-        {
-          partitions.forEach(tp ->
-          {
-            Integer partition = tp.partition();
-            Long offset = consumer.position(tp);
-            log.info("{} - adding partition: {}, offset={}", id, partition, offset);
-            offsets.put(partition, offset);
-            seen.put(partition, new HashMap<>());
-          });
-        }
-      });
-
-      while (true)
-      {
-        ConsumerRecords<K, V> records =
-            consumer.poll(Duration.ofSeconds(1));
-
-        // Do something with the data...
-        log.info("{} - Received {} messages", id, records.count());
-        for (ConsumerRecord<K, V> record : records)
-        {
-          log.info(
-              "{} - {}: {}/{} - {}={}",
-              id,
-              record.offset(),
-              record.topic(),
-              record.partition(),
-              record.key(),
-              record.value()
-          );
-
-          handler.accept(record);
-
-          consumed++;
-
-          Integer partition = record.partition();
-          String key = record.key() == null ? "NULL" : record.key().toString();
-          Map<String, Long> byKey = seen.get(partition);
-
-          if (!byKey.containsKey(key))
-            byKey.put(key, 0l);
-
-          long seenByKey = byKey.get(key);
-          seenByKey++;
-          byKey.put(key, seenByKey);
-        }
-      }
-    }
-    catch(WakeupException e)
-    {
-      log.info("{} - RIIING! Request to stop consumption - commiting current offsets!", id);
-      consumer.commitSync();
-      shutdown();
-    }
-    catch(RecordDeserializationException e)
-    {
-      TopicPartition tp = e.topicPartition();
-      long offset = e.offset();
-      log.error(
-          "{} - Could not deserialize  message on topic {} with offset={}: {}",
-          id,
-          tp,
-          offset,
-          e.getCause().toString());
-
-      consumer.commitSync();
-      shutdown(e);
-    }
-    catch(Exception e)
-    {
-      log.error("{} - Unexpected error: {}", id, e.toString(), e);
-      shutdown(e);
-    }
-    finally
-    {
-      log.info("{} - Consumer-Thread exiting", id);
-    }
-  }
-
-  private void shutdown()
-  {
-    shutdown(null);
-  }
-
-  private void shutdown(Exception e)
-  {
-    lock.lock();
-    try
-    {
-      try
-      {
-        log.info("{} - Unsubscribing from topic {}", id, topic);
-        consumer.unsubscribe();
-      }
-      catch (Exception ue)
-      {
-        log.error(
-            "{} - Error while unsubscribing from topic {}: {}",
-            id,
-            topic,
-            ue.toString());
-      }
-      finally
-      {
-        running = false;
-        exception = e;
-        condition.signal();
-      }
-    }
-    finally
-    {
-      lock.unlock();
-    }
-  }
-
-  public Map<Integer, Map<String, Long>> getSeen()
-  {
-    return seen;
-  }
-
-  public void start()
-  {
-    lock.lock();
-    try
-    {
-      if (running)
-        throw new IllegalStateException("Consumer instance " + id + " is already running!");
-
-      log.info("{} - Starting - consumed {} messages before", id, consumed);
-      running = true;
-      exception = null;
-      executor.submit(this);
-    }
-    finally
-    {
-      lock.unlock();
-    }
-  }
-
-  public synchronized void stop() throws ExecutionException, InterruptedException
-  {
-    lock.lock();
-    try
-    {
-      if (!running)
-        throw new IllegalStateException("Consumer instance " + id + " is not running!");
-
-      log.info("{} - Stopping", id);
-      consumer.wakeup();
-      condition.await();
-      log.info("{} - Stopped - consumed {} messages so far", id, consumed);
-    }
-    finally
-    {
-      lock.unlock();
-    }
-  }
-
-  @PreDestroy
-  public void destroy() throws ExecutionException, InterruptedException
-  {
-    log.info("{} - Destroy!", id);
-    try
-    {
-      stop();
-    }
-    catch (IllegalStateException e)
-    {
-      log.info("{} - Was already stopped", id);
-    }
-    catch (Exception e)
-    {
-      log.error("{} - Unexpected exception while trying to stop the consumer", id, e);
-    }
-    finally
-    {
-      log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
-    }
-  }
-
-  public boolean running()
-  {
-    lock.lock();
-    try
-    {
-      return running;
-    }
-    finally
-    {
-      lock.unlock();
-    }
-  }
-
-  public Optional<Exception> exitStatus()
-  {
-    lock.lock();
-    try
-    {
-      if (running)
-        throw new IllegalStateException("No exit-status available: Consumer instance " + id + " is running!");
-
-      return Optional.ofNullable(exception);
-    }
-    finally
-    {
-      lock.unlock();
-    }
+  @Value("${consumer.client-id}")
+  String id;
+  @Autowired
+  Consumer<ConsumerRecord<K, V>> handler;
+
+
+  @KafkaListener(topics = "${consumer.topic}")
+  public void receive(ConsumerRecord<K, V> record)
+  {
+    log.info(
+        "{} - {}: {}/{} - {}={}",
+        id,
+        record.offset(),
+        record.topic(),
+        record.partition(),
+        record.key(),
+        record.value()
+    );
+
+    handler.accept(record);
   }
 }
index 9f3cb81..1cb6212 100644 (file)
@@ -24,6 +24,14 @@ info:
     group-id: ${consumer.group-id}
     topic: ${consumer.topic}
     auto-offset-reset: ${consumer.auto-offset-reset}
+spring:
+  kafka:
+    consumer:
+      bootstrap-servers: ${consumer.bootstrap-server}
+      client-id: ${consumer.client-id}
+      auto-offset-reset: ${consumer.auto-offset-reset}
+      group-id: ${consumer.group-id}
+      value-deserializer: org.apache.kafka.common.serialization.LongDeserializer
 logging:
   level:
     root: INFO
index b8e6780..c41e588 100644 (file)
@@ -9,6 +9,7 @@
 
   <logger name="de.juplo" level="TRACE"/>
   <!-- logger name="org.apache.kafka.clients" level="DEBUG" / -->
+  <logger name="org.springframework.kafka" level="DEBUG"/>
 
   <root level="INFO">
     <appender-ref ref="STDOUT" />
index 6f58180..cbf215e 100644 (file)
@@ -13,10 +13,12 @@ import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.junit.jupiter.api.*;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
 import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer;
 import org.springframework.boot.test.context.TestConfiguration;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Import;
+import org.springframework.context.annotation.Primary;
 import org.springframework.kafka.test.context.EmbeddedKafka;
 import org.springframework.test.context.TestPropertySource;
 import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
@@ -24,7 +26,6 @@ import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
 import java.time.Duration;
 import java.util.*;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.function.Function;
@@ -37,7 +38,12 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.awaitility.Awaitility.*;
 
 
-@SpringJUnitConfig(initializers = ConfigDataApplicationContextInitializer.class)
+@SpringJUnitConfig(
+               initializers = ConfigDataApplicationContextInitializer.class,
+    classes = {
+                               EndlessConsumer.class,
+                               KafkaAutoConfiguration.class,
+                               ApplicationTests.Configuration.class })
 @TestMethodOrder(MethodOrderer.OrderAnnotation.class)
 @TestPropertySource(
                properties = {
@@ -57,16 +63,12 @@ class ApplicationTests
        @Autowired
        KafkaProducer<String, Bytes> kafkaProducer;
        @Autowired
-       KafkaConsumer<String, Long> kafkaConsumer;
-       @Autowired
        KafkaConsumer<Bytes, Bytes> offsetConsumer;
        @Autowired
        ApplicationProperties properties;
        @Autowired
-       ExecutorService executor;
+       RecordHandler recordHandler;
 
-       Consumer<ConsumerRecord<String, Long>> testHandler;
-       EndlessConsumer<String, Long> endlessConsumer;
        Map<TopicPartition, Long> oldOffsets;
        Map<TopicPartition, Long> newOffsets;
 
@@ -80,7 +82,7 @@ class ApplicationTests
                send100Messages(i ->  new Bytes(longSerializer.serialize(TOPIC, i)));
 
                Set<ConsumerRecord<String, Long>> received = new HashSet<>();
-               testHandler = record -> received.add(record);
+               recordHandler.testHandler = record -> received.add(record);
 
                await("100 records received")
                                .atMost(Duration.ofSeconds(30))
@@ -106,17 +108,8 @@ class ApplicationTests
 
                await("Consumer failed")
                                .atMost(Duration.ofSeconds(30))
-                               .until(() -> !endlessConsumer.running());
+                               .untilAsserted(() -> checkSeenOffsetsForProgress());
 
-               checkSeenOffsetsForProgress();
-               compareToCommitedOffsets(newOffsets);
-
-               endlessConsumer.start();
-               await("Consumer failed")
-                               .atMost(Duration.ofSeconds(30))
-                               .until(() -> !endlessConsumer.running());
-
-               checkSeenOffsetsForProgress();
                compareToCommitedOffsets(newOffsets);
        }
 
@@ -215,7 +208,7 @@ class ApplicationTests
        @BeforeEach
        public void init()
        {
-               testHandler = record -> {} ;
+               recordHandler.testHandler = (record) -> {};
 
                oldOffsets = new HashMap<>();
                newOffsets = new HashMap<>();
@@ -226,44 +219,40 @@ class ApplicationTests
                        newOffsets.put(tp, offset - 1);
                });
 
-               Consumer<ConsumerRecord<String, Long>> captureOffsetAndExecuteTestHandler =
+               recordHandler.captureOffsets =
                                record ->
-                               {
                                        newOffsets.put(
                                                        new TopicPartition(record.topic(), record.partition()),
                                                        record.offset());
-                                       testHandler.accept(record);
-                               };
-
-               endlessConsumer =
-                               new EndlessConsumer<>(
-                                               executor,
-                                               properties.getClientId(),
-                                               properties.getTopic(),
-                                               kafkaConsumer,
-                                               captureOffsetAndExecuteTestHandler);
-
-               endlessConsumer.start();
        }
 
-       @AfterEach
-       public void deinit()
+
+       public static class RecordHandler implements Consumer<ConsumerRecord<String, Long>>
        {
-               try
-               {
-                       endlessConsumer.stop();
-               }
-               catch (Exception e)
+               Consumer<ConsumerRecord<String, Long>> captureOffsets;
+               Consumer<ConsumerRecord<String, Long>> testHandler;
+
+
+               @Override
+               public void accept(ConsumerRecord<String, Long> record)
                {
-                       log.info("Exception while stopping the consumer: {}", e.toString());
+                       captureOffsets
+                                       .andThen(testHandler)
+                                       .accept(record);
                }
        }
 
-
        @TestConfiguration
        @Import(ApplicationConfiguration.class)
        public static class Configuration
        {
+               @Primary
+               @Bean
+               public Consumer<ConsumerRecord<String, Long>> testHandler()
+               {
+                       return new RecordHandler();
+               }
+
                @Bean
                KafkaProducer<String, Bytes> kafkaProducer(ApplicationProperties properties)
                {