Springify: Merge des verschärften Tests aus der Vanilla-Version
authorKai Moritz <kai@juplo.de>
Mon, 18 Apr 2022 10:46:46 +0000 (12:46 +0200)
committerKai Moritz <kai@juplo.de>
Mon, 18 Apr 2022 11:00:53 +0000 (13:00 +0200)
* Logik zur Abfrage der Exception wiederbelebt, an der ein über eine
  Poison Pill gestolperter `KafkaConsumer` gestorben ist, damit die
  springifizierte Version den verschärften Test bestehen kann.
* Um an die Exception zu gelangen, musste eine angepasste
  Version des `CommonContainerStoppingExceptionHandler` implementiert
  werden, die sich die Exception, über die der `KafkaConsumer` gestolpert
  ist, merkt.
* Dabei auch den Health-Endpoint wiederbelebt.
* Seltsamer Weise musste dabei der Code für die AssertJ-Assertions
  angepasst werden, obwohl sich die Logik im Testfall und die Signatur der
  getesteten Methode nicht geändert hat. Vielleicht durch eine Änderung in
  den transitiv angezogenen Abhängigkeiten durch das Einbinden von
  Spring Kafka??

1  2 
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/ApplicationErrorHandler.java
src/main/java/de/juplo/kafka/EndlessConsumer.java
src/test/java/de/juplo/kafka/ApplicationTests.java

@@@ -23,8 -28,40 +23,8 @@@ public class ApplicationConfiguratio
    }
  
    @Bean
-   public CommonContainerStoppingErrorHandler errorHandler()
 -  public EndlessConsumer<String, Long> endlessConsumer(
 -      KafkaConsumer<String, Long> kafkaConsumer,
 -      ExecutorService executor,
 -      Consumer<ConsumerRecord<String, Long>> handler,
 -      ApplicationProperties properties)
++  public ApplicationErrorHandler errorHandler()
    {
-     return new CommonContainerStoppingErrorHandler();
 -    return
 -        new EndlessConsumer<>(
 -            executor,
 -            properties.getClientId(),
 -            properties.getTopic(),
 -            kafkaConsumer,
 -            handler);
 -  }
 -
 -  @Bean
 -  public ExecutorService executor()
 -  {
 -    return Executors.newSingleThreadExecutor();
 -  }
 -
 -  @Bean(destroyMethod = "close")
 -  public KafkaConsumer<String, Long> kafkaConsumer(ApplicationProperties properties)
 -  {
 -    Properties props = new Properties();
 -
 -    props.put("bootstrap.servers", properties.getBootstrapServer());
 -    props.put("group.id", properties.getGroupId());
 -    props.put("client.id", properties.getClientId());
 -    props.put("auto.offset.reset", properties.getAutoOffsetReset());
 -    props.put("metadata.max.age.ms", "1000");
 -    props.put("key.deserializer", StringDeserializer.class.getName());
 -    props.put("value.deserializer", LongDeserializer.class.getName());
 -
 -    return new KafkaConsumer<>(props);
++    return new ApplicationErrorHandler();
    }
  }
index 0000000,0000000..273f509
new file mode 100644 (file)
--- /dev/null
--- /dev/null
@@@ -1,0 -1,0 +1,61 @@@
++package de.juplo.kafka;
++
++import org.apache.kafka.clients.consumer.Consumer;
++import org.apache.kafka.clients.consumer.ConsumerRecord;
++import org.apache.kafka.clients.consumer.ConsumerRecords;
++import org.springframework.kafka.listener.CommonContainerStoppingErrorHandler;
++import org.springframework.kafka.listener.MessageListenerContainer;
++
++import java.util.List;
++import java.util.Optional;
++
++
++public class ApplicationErrorHandler extends CommonContainerStoppingErrorHandler
++{
++  private Exception exception;
++
++
++  public synchronized Optional<Exception> getException()
++  {
++    return Optional.ofNullable(exception);
++  }
++
++  public synchronized void clearException()
++  {
++    this.exception = null;
++  }
++
++
++  @Override
++  public void handleOtherException(
++      Exception thrownException, Consumer<?, ?> consumer,
++      MessageListenerContainer container,
++      boolean batchListener)
++  {
++    this.exception = thrownException;
++    super.handleOtherException(thrownException, consumer, container, batchListener);
++  }
++
++  @Override
++  public void handleRemaining(
++      Exception thrownException,
++      List<ConsumerRecord<?, ?>> records,
++      Consumer<?, ?> consumer,
++      MessageListenerContainer container)
++  {
++    this.exception = thrownException;
++    super.handleRemaining(thrownException, records, consumer, container);
++  }
++
++  @Override
++  public void handleBatch(
++      Exception thrownException,
++      ConsumerRecords<?, ?> data,
++      Consumer<?, ?> consumer,
++      MessageListenerContainer container,
++      Runnable invokeListener)
++  {
++    this.exception = thrownException;
++    super.handleBatch(thrownException, data, consumer, container, invokeListener);
++  }
++}
@@@ -2,69 -2,285 +2,81 @@@ package de.juplo.kafka
  
  import lombok.RequiredArgsConstructor;
  import lombok.extern.slf4j.Slf4j;
 -import org.apache.kafka.clients.consumer.*;
 -import org.apache.kafka.common.TopicPartition;
 -import org.apache.kafka.common.errors.RecordDeserializationException;
 -import org.apache.kafka.common.errors.WakeupException;
 +import org.apache.kafka.clients.consumer.ConsumerRecord;
 +import org.springframework.beans.factory.annotation.Autowired;
 +import org.springframework.beans.factory.annotation.Value;
 +import org.springframework.kafka.annotation.KafkaListener;
 +import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
 +import org.springframework.stereotype.Component;
  
 -import javax.annotation.PreDestroy;
 -import java.time.Duration;
 -import java.util.*;
 -import java.util.concurrent.ExecutionException;
 -import java.util.concurrent.ExecutorService;
 -import java.util.concurrent.locks.Condition;
 -import java.util.concurrent.locks.Lock;
 -import java.util.concurrent.locks.ReentrantLock;
++import java.util.Optional;
 +import java.util.function.Consumer;
  
  
 +@Component
  @Slf4j
  @RequiredArgsConstructor
 -public class EndlessConsumer<K, V> implements Runnable
 +public class EndlessConsumer<K, V>
  {
 -  private final ExecutorService executor;
 -  private final String id;
 -  private final String topic;
 -  private final Consumer<K, V> consumer;
 -  private final java.util.function.Consumer<ConsumerRecord<K, V>> handler;
 +  @Autowired
 +  private KafkaListenerEndpointRegistry registry;
 +  @Value("${consumer.client-id}")
 +  String id;
 +  @Autowired
 +  Consumer<ConsumerRecord<K, V>> handler;
++  @Autowired
++  ApplicationErrorHandler errorHandler;
  
 -  private final Lock lock = new ReentrantLock();
 -  private final Condition condition = lock.newCondition();
 -  private boolean running = false;
 -  private Exception exception;
    private long consumed = 0;
  
 -  private final Map<Integer, Map<String, Long>> seen = new HashMap<>();
 -  private final Map<Integer, Long> offsets = new HashMap<>();
 -
 -
 -  @Override
 -  public void run()
 +  @KafkaListener(
 +      id = "${consumer.client-id}",
 +      idIsGroup = false,
 +      topics = "${consumer.topic}",
 +      autoStartup = "false")
 +  public void receive(ConsumerRecord<K, V> record)
    {
 -    try
 -    {
 -      log.info("{} - Subscribing to topic {}", id, topic);
 -      consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener()
 -      {
 -        @Override
 -        public void onPartitionsRevoked(Collection<TopicPartition> partitions)
 -        {
 -          partitions.forEach(tp ->
 -          {
 -            Integer partition = tp.partition();
 -            Long newOffset = consumer.position(tp);
 -            Long oldOffset = offsets.remove(partition);
 -            log.info(
 -                "{} - removing partition: {}, consumed {} records (offset {} -> {})",
 -                id,
 -                partition,
 -                newOffset - oldOffset,
 -                oldOffset,
 -                newOffset);
 -            Map<String, Long> removed = seen.remove(partition);
 -            for (String key : removed.keySet())
 -            {
 -              log.info(
 -                  "{} - Seen {} messages for partition={}|key={}",
 -                  id,
 -                  removed.get(key),
 -                  partition,
 -                  key);
 -            }
 -          });
 -        }
 -
 -        @Override
 -        public void onPartitionsAssigned(Collection<TopicPartition> partitions)
 -        {
 -          partitions.forEach(tp ->
 -          {
 -            Integer partition = tp.partition();
 -            Long offset = consumer.position(tp);
 -            log.info("{} - adding partition: {}, offset={}", id, partition, offset);
 -            offsets.put(partition, offset);
 -            seen.put(partition, new HashMap<>());
 -          });
 -        }
 -      });
 -
 -      while (true)
 -      {
 -        ConsumerRecords<K, V> records =
 -            consumer.poll(Duration.ofSeconds(1));
 -
 -        // Do something with the data...
 -        log.info("{} - Received {} messages", id, records.count());
 -        for (ConsumerRecord<K, V> record : records)
 -        {
 -          log.info(
 -              "{} - {}: {}/{} - {}={}",
 -              id,
 -              record.offset(),
 -              record.topic(),
 -              record.partition(),
 -              record.key(),
 -              record.value()
 -          );
 -
 -          handler.accept(record);
 -
 -          consumed++;
 -
 -          Integer partition = record.partition();
 -          String key = record.key() == null ? "NULL" : record.key().toString();
 -          Map<String, Long> byKey = seen.get(partition);
 -
 -          if (!byKey.containsKey(key))
 -            byKey.put(key, 0l);
 -
 -          long seenByKey = byKey.get(key);
 -          seenByKey++;
 -          byKey.put(key, seenByKey);
 -        }
 -      }
 -    }
 -    catch(WakeupException e)
 -    {
 -      log.info("{} - RIIING! Request to stop consumption - commiting current offsets!", id);
 -      consumer.commitSync();
 -      shutdown();
 -    }
 -    catch(RecordDeserializationException e)
 -    {
 -      TopicPartition tp = e.topicPartition();
 -      long offset = e.offset();
 -      log.error(
 -          "{} - Could not deserialize  message on topic {} with offset={}: {}",
 -          id,
 -          tp,
 -          offset,
 -          e.getCause().toString());
 -
 -      consumer.commitSync();
 -      shutdown(e);
 -    }
 -    catch(Exception e)
 -    {
 -      log.error("{} - Unexpected error: {}", id, e.toString(), e);
 -      shutdown(e);
 -    }
 -    finally
 -    {
 -      log.info("{} - Consumer-Thread exiting", id);
 -    }
 +    log.info(
 +        "{} - {}: {}/{} - {}={}",
 +        id,
 +        record.offset(),
 +        record.topic(),
 +        record.partition(),
 +        record.key(),
 +        record.value()
 +    );
 +
 +    handler.accept(record);
 +
 +    consumed++;
    }
  
 -  private void shutdown()
 -  {
 -    shutdown(null);
 -  }
  
 -  private void shutdown(Exception e)
 +  public synchronized void start()
    {
 -    lock.lock();
 -    try
 -    {
 -      try
 -      {
 -        log.info("{} - Unsubscribing from topic {}", id, topic);
 -        consumer.unsubscribe();
 -      }
 -      catch (Exception ue)
 -      {
 -        log.error(
 -            "{} - Error while unsubscribing from topic {}: {}",
 -            id,
 -            topic,
 -            ue.toString());
 -      }
 -      finally
 -      {
 -        running = false;
 -        exception = e;
 -        condition.signal();
 -      }
 -    }
 -    finally
 -    {
 -      lock.unlock();
 -    }
 -  }
 -
 -  public Map<Integer, Map<String, Long>> getSeen()
 -  {
 -    return seen;
 -  }
 -
 -  public void start()
 -  {
 -    lock.lock();
 -    try
 -    {
 -      if (running)
 -        throw new IllegalStateException("Consumer instance " + id + " is already running!");
 +    if (registry.getListenerContainer(id).isChildRunning())
 +      throw new IllegalStateException("Consumer instance " + id + " is already running!");
  
 -      log.info("{} - Starting - consumed {} messages before", id, consumed);
 -      running = true;
 -      exception = null;
 -      executor.submit(this);
 -    }
 -    finally
 -    {
 -      lock.unlock();
 -    }
 +    log.info("{} - Starting - consumed {} messages before", id, consumed);
++    errorHandler.clearException();
 +    registry.getListenerContainer(id).start();
    }
  
 -  public synchronized void stop() throws ExecutionException, InterruptedException
 +  public synchronized void stop()
    {
 -    lock.lock();
 -    try
 -    {
 -      if (!running)
 -        throw new IllegalStateException("Consumer instance " + id + " is not running!");
 +    if (!registry.getListenerContainer(id).isChildRunning())
 +      throw new IllegalStateException("Consumer instance " + id + " is not running!");
  
 -      log.info("{} - Stopping", id);
 -      consumer.wakeup();
 -      condition.await();
 -      log.info("{} - Stopped - consumed {} messages so far", id, consumed);
 -    }
 -    finally
 -    {
 -      lock.unlock();
 -    }
 -  }
 -
 -  @PreDestroy
 -  public void destroy() throws ExecutionException, InterruptedException
 -  {
 -    log.info("{} - Destroy!", id);
 -    try
 -    {
 -      stop();
 -    }
 -    catch (IllegalStateException e)
 -    {
 -      log.info("{} - Was already stopped", id);
 -    }
 -    catch (Exception e)
 -    {
 -      log.error("{} - Unexpected exception while trying to stop the consumer", id, e);
 -    }
 -    finally
 -    {
 -      log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
 -    }
 -  }
 -
 -  public boolean running()
 -  {
 -    lock.lock();
 -    try
 -    {
 -      return running;
 -    }
 -    finally
 -    {
 -      lock.unlock();
 -    }
 +    log.info("{} - Stopping", id);
 +    registry.getListenerContainer(id).stop();
 +    log.info("{} - Stopped - consumed {} messages so far", id, consumed);
    }
 -  public Optional<Exception> exitStatus()
 -    lock.lock();
 -    try
 -    {
 -      if (running)
 -        throw new IllegalStateException("No exit-status available: Consumer instance " + id + " is running!");
++  public synchronized Optional<Exception> exitStatus()
+   {
 -      return Optional.ofNullable(exception);
 -    }
 -    finally
 -    {
 -      lock.unlock();
 -    }
++    if (registry.getListenerContainer(id).isChildRunning())
++      throw new IllegalStateException("No exit-status available: Consumer instance " + id + " is running!");
++    return errorHandler.getException();
+   }
  }
@@@ -6,14 -6,11 +6,13 @@@ import org.apache.kafka.clients.consume
  import org.apache.kafka.clients.producer.KafkaProducer;
  import org.apache.kafka.clients.producer.ProducerRecord;
  import org.apache.kafka.common.TopicPartition;
- import org.apache.kafka.common.serialization.BytesDeserializer;
- import org.apache.kafka.common.serialization.BytesSerializer;
- import org.apache.kafka.common.serialization.LongSerializer;
- import org.apache.kafka.common.serialization.StringSerializer;
+ import org.apache.kafka.common.errors.RecordDeserializationException;
+ import org.apache.kafka.common.serialization.*;
  import org.apache.kafka.common.utils.Bytes;
++import org.assertj.core.api.OptionalAssert;
  import org.junit.jupiter.api.*;
  import org.springframework.beans.factory.annotation.Autowired;
 +import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
  import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer;
  import org.springframework.boot.test.context.TestConfiguration;
  import org.springframework.context.annotation.Bean;
@@@ -114,6 -122,13 +118,13 @@@ class ApplicationTest
                assertThat(receivedRecords.size())
                                .describedAs("Received not all sent events")
                                .isLessThan(100);
 -              assertThat(endlessConsumer.exitStatus())
+               assertThatNoException()
+                               .describedAs("Consumer should not be running")
+                               .isThrownBy(() -> endlessConsumer.exitStatus());
++              ((OptionalAssert<Exception>)assertThat(endlessConsumer.exitStatus()))
+                               .describedAs("Consumer should have exited abnormally")
+                               .containsInstanceOf(RecordDeserializationException.class);
        }
  
  
        @Import(ApplicationConfiguration.class)
        public static class Configuration
        {
 +              @Primary
 +              @Bean
 +              public Consumer<ConsumerRecord<String, Long>> testHandler()
 +              {
 +                      return new RecordHandler();
 +              }
 +
+               @Bean
+               Serializer<Long> serializer()
+               {
+                       return new LongSerializer();
+               }
                @Bean
                KafkaProducer<String, Bytes> kafkaProducer(ApplicationProperties properties)
                {