From: Kai Moritz Date: Mon, 18 Apr 2022 10:46:46 +0000 (+0200) Subject: Springify: Merge des verschärften Tests aus der Vanilla-Version X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=95ada5445e5db63f53f9c36d55ba862459ea923e;p=demos%2Fkafka%2Ftraining Springify: Merge des verschärften Tests aus der Vanilla-Version * Logik zur Abfrage der Exception wiederbelebt, an der ein über eine Poison Pill gestolperter `KafkaConsumer` gestorben ist, damit die springifizierte Version den verschärften Test bestehen kann. * Um an die Exception zu gelangen, musste eine angepasste Version des `CommonContainerStoppingExceptionHandler` implementiert werden, die sich die Exception, über die der `KafkaConsumer` gestolpert ist, merkt. * Dabei auch den Health-Endpoint wiederbelebt. * Seltsamer Weise musste dabei der Code für die AssertJ-Assertions angepasst werden, obwohl sich die Logik im Testfall und die Signatur der getesteten Methode nicht geändert hat. Vielleicht durch eine Änderung in den transitiv angezogenen Abhängigkeiten durch das Einbinden von Spring Kafka?? --- 95ada5445e5db63f53f9c36d55ba862459ea923e diff --cc src/main/java/de/juplo/kafka/ApplicationConfiguration.java index fd4ff28,4054e93..b67f795 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@@ -23,8 -28,40 +23,8 @@@ public class ApplicationConfiguratio } @Bean - public CommonContainerStoppingErrorHandler errorHandler() - public EndlessConsumer endlessConsumer( - KafkaConsumer kafkaConsumer, - ExecutorService executor, - Consumer> handler, - ApplicationProperties properties) ++ public ApplicationErrorHandler errorHandler() { - return new CommonContainerStoppingErrorHandler(); - return - new EndlessConsumer<>( - executor, - properties.getClientId(), - properties.getTopic(), - kafkaConsumer, - handler); - } - - @Bean - public ExecutorService executor() - { - return Executors.newSingleThreadExecutor(); - } - - @Bean(destroyMethod = "close") - public KafkaConsumer kafkaConsumer(ApplicationProperties properties) - { - Properties props = new Properties(); - - props.put("bootstrap.servers", properties.getBootstrapServer()); - props.put("group.id", properties.getGroupId()); - props.put("client.id", properties.getClientId()); - props.put("auto.offset.reset", properties.getAutoOffsetReset()); - props.put("metadata.max.age.ms", "1000"); - props.put("key.deserializer", StringDeserializer.class.getName()); - props.put("value.deserializer", LongDeserializer.class.getName()); - - return new KafkaConsumer<>(props); ++ return new ApplicationErrorHandler(); } } diff --cc src/main/java/de/juplo/kafka/ApplicationErrorHandler.java index 0000000,0000000..273f509 new file mode 100644 --- /dev/null +++ b/src/main/java/de/juplo/kafka/ApplicationErrorHandler.java @@@ -1,0 -1,0 +1,61 @@@ ++package de.juplo.kafka; ++ ++import org.apache.kafka.clients.consumer.Consumer; ++import org.apache.kafka.clients.consumer.ConsumerRecord; ++import org.apache.kafka.clients.consumer.ConsumerRecords; ++import org.springframework.kafka.listener.CommonContainerStoppingErrorHandler; ++import org.springframework.kafka.listener.MessageListenerContainer; ++ ++import java.util.List; ++import java.util.Optional; ++ ++ ++public class ApplicationErrorHandler extends CommonContainerStoppingErrorHandler ++{ ++ private Exception exception; ++ ++ ++ public synchronized Optional getException() ++ { ++ return Optional.ofNullable(exception); ++ } ++ ++ public synchronized void clearException() ++ { ++ this.exception = null; ++ } ++ ++ ++ @Override ++ public void handleOtherException( ++ Exception thrownException, Consumer consumer, ++ MessageListenerContainer container, ++ boolean batchListener) ++ { ++ this.exception = thrownException; ++ super.handleOtherException(thrownException, consumer, container, batchListener); ++ } ++ ++ @Override ++ public void handleRemaining( ++ Exception thrownException, ++ List> records, ++ Consumer consumer, ++ MessageListenerContainer container) ++ { ++ this.exception = thrownException; ++ super.handleRemaining(thrownException, records, consumer, container); ++ } ++ ++ @Override ++ public void handleBatch( ++ Exception thrownException, ++ ConsumerRecords data, ++ Consumer consumer, ++ MessageListenerContainer container, ++ Runnable invokeListener) ++ { ++ this.exception = thrownException; ++ super.handleBatch(thrownException, data, consumer, container, invokeListener); ++ } ++} diff --cc src/main/java/de/juplo/kafka/EndlessConsumer.java index 888805f,b173b12..15e1b4e --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@@ -2,69 -2,285 +2,81 @@@ package de.juplo.kafka import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.*; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.RecordDeserializationException; -import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.config.KafkaListenerEndpointRegistry; +import org.springframework.stereotype.Component; -import javax.annotation.PreDestroy; -import java.time.Duration; -import java.util.*; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; ++import java.util.Optional; +import java.util.function.Consumer; +@Component @Slf4j @RequiredArgsConstructor -public class EndlessConsumer implements Runnable +public class EndlessConsumer { - private final ExecutorService executor; - private final String id; - private final String topic; - private final Consumer consumer; - private final java.util.function.Consumer> handler; + @Autowired + private KafkaListenerEndpointRegistry registry; + @Value("${consumer.client-id}") + String id; + @Autowired + Consumer> handler; ++ @Autowired ++ ApplicationErrorHandler errorHandler; - private final Lock lock = new ReentrantLock(); - private final Condition condition = lock.newCondition(); - private boolean running = false; - private Exception exception; private long consumed = 0; - private final Map> seen = new HashMap<>(); - private final Map offsets = new HashMap<>(); - - - @Override - public void run() + @KafkaListener( + id = "${consumer.client-id}", + idIsGroup = false, + topics = "${consumer.topic}", + autoStartup = "false") + public void receive(ConsumerRecord record) { - try - { - log.info("{} - Subscribing to topic {}", id, topic); - consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() - { - @Override - public void onPartitionsRevoked(Collection partitions) - { - partitions.forEach(tp -> - { - Integer partition = tp.partition(); - Long newOffset = consumer.position(tp); - Long oldOffset = offsets.remove(partition); - log.info( - "{} - removing partition: {}, consumed {} records (offset {} -> {})", - id, - partition, - newOffset - oldOffset, - oldOffset, - newOffset); - Map removed = seen.remove(partition); - for (String key : removed.keySet()) - { - log.info( - "{} - Seen {} messages for partition={}|key={}", - id, - removed.get(key), - partition, - key); - } - }); - } - - @Override - public void onPartitionsAssigned(Collection partitions) - { - partitions.forEach(tp -> - { - Integer partition = tp.partition(); - Long offset = consumer.position(tp); - log.info("{} - adding partition: {}, offset={}", id, partition, offset); - offsets.put(partition, offset); - seen.put(partition, new HashMap<>()); - }); - } - }); - - while (true) - { - ConsumerRecords records = - consumer.poll(Duration.ofSeconds(1)); - - // Do something with the data... - log.info("{} - Received {} messages", id, records.count()); - for (ConsumerRecord record : records) - { - log.info( - "{} - {}: {}/{} - {}={}", - id, - record.offset(), - record.topic(), - record.partition(), - record.key(), - record.value() - ); - - handler.accept(record); - - consumed++; - - Integer partition = record.partition(); - String key = record.key() == null ? "NULL" : record.key().toString(); - Map byKey = seen.get(partition); - - if (!byKey.containsKey(key)) - byKey.put(key, 0l); - - long seenByKey = byKey.get(key); - seenByKey++; - byKey.put(key, seenByKey); - } - } - } - catch(WakeupException e) - { - log.info("{} - RIIING! Request to stop consumption - commiting current offsets!", id); - consumer.commitSync(); - shutdown(); - } - catch(RecordDeserializationException e) - { - TopicPartition tp = e.topicPartition(); - long offset = e.offset(); - log.error( - "{} - Could not deserialize message on topic {} with offset={}: {}", - id, - tp, - offset, - e.getCause().toString()); - - consumer.commitSync(); - shutdown(e); - } - catch(Exception e) - { - log.error("{} - Unexpected error: {}", id, e.toString(), e); - shutdown(e); - } - finally - { - log.info("{} - Consumer-Thread exiting", id); - } + log.info( + "{} - {}: {}/{} - {}={}", + id, + record.offset(), + record.topic(), + record.partition(), + record.key(), + record.value() + ); + + handler.accept(record); + + consumed++; } - private void shutdown() - { - shutdown(null); - } - private void shutdown(Exception e) + public synchronized void start() { - lock.lock(); - try - { - try - { - log.info("{} - Unsubscribing from topic {}", id, topic); - consumer.unsubscribe(); - } - catch (Exception ue) - { - log.error( - "{} - Error while unsubscribing from topic {}: {}", - id, - topic, - ue.toString()); - } - finally - { - running = false; - exception = e; - condition.signal(); - } - } - finally - { - lock.unlock(); - } - } - - public Map> getSeen() - { - return seen; - } - - public void start() - { - lock.lock(); - try - { - if (running) - throw new IllegalStateException("Consumer instance " + id + " is already running!"); + if (registry.getListenerContainer(id).isChildRunning()) + throw new IllegalStateException("Consumer instance " + id + " is already running!"); - log.info("{} - Starting - consumed {} messages before", id, consumed); - running = true; - exception = null; - executor.submit(this); - } - finally - { - lock.unlock(); - } + log.info("{} - Starting - consumed {} messages before", id, consumed); ++ errorHandler.clearException(); + registry.getListenerContainer(id).start(); } - public synchronized void stop() throws ExecutionException, InterruptedException + public synchronized void stop() { - lock.lock(); - try - { - if (!running) - throw new IllegalStateException("Consumer instance " + id + " is not running!"); + if (!registry.getListenerContainer(id).isChildRunning()) + throw new IllegalStateException("Consumer instance " + id + " is not running!"); - log.info("{} - Stopping", id); - consumer.wakeup(); - condition.await(); - log.info("{} - Stopped - consumed {} messages so far", id, consumed); - } - finally - { - lock.unlock(); - } - } - - @PreDestroy - public void destroy() throws ExecutionException, InterruptedException - { - log.info("{} - Destroy!", id); - try - { - stop(); - } - catch (IllegalStateException e) - { - log.info("{} - Was already stopped", id); - } - catch (Exception e) - { - log.error("{} - Unexpected exception while trying to stop the consumer", id, e); - } - finally - { - log.info("{}: Consumed {} messages in total, exiting!", id, consumed); - } - } - - public boolean running() - { - lock.lock(); - try - { - return running; - } - finally - { - lock.unlock(); - } + log.info("{} - Stopping", id); + registry.getListenerContainer(id).stop(); + log.info("{} - Stopped - consumed {} messages so far", id, consumed); } + - public Optional exitStatus() ++ public synchronized Optional exitStatus() + { - lock.lock(); - try - { - if (running) - throw new IllegalStateException("No exit-status available: Consumer instance " + id + " is running!"); ++ if (registry.getListenerContainer(id).isChildRunning()) ++ throw new IllegalStateException("No exit-status available: Consumer instance " + id + " is running!"); + - return Optional.ofNullable(exception); - } - finally - { - lock.unlock(); - } ++ return errorHandler.getException(); + } } diff --cc src/test/java/de/juplo/kafka/ApplicationTests.java index 3ded0d2,40dc149..1d3546c --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@@ -6,14 -6,11 +6,13 @@@ import org.apache.kafka.clients.consume import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; - import org.apache.kafka.common.serialization.BytesDeserializer; - import org.apache.kafka.common.serialization.BytesSerializer; - import org.apache.kafka.common.serialization.LongSerializer; - import org.apache.kafka.common.serialization.StringSerializer; + import org.apache.kafka.common.errors.RecordDeserializationException; + import org.apache.kafka.common.serialization.*; import org.apache.kafka.common.utils.Bytes; ++import org.assertj.core.api.OptionalAssert; import org.junit.jupiter.api.*; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration; import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer; import org.springframework.boot.test.context.TestConfiguration; import org.springframework.context.annotation.Bean; @@@ -114,6 -122,13 +118,13 @@@ class ApplicationTest assertThat(receivedRecords.size()) .describedAs("Received not all sent events") .isLessThan(100); + + assertThatNoException() + .describedAs("Consumer should not be running") + .isThrownBy(() -> endlessConsumer.exitStatus()); - assertThat(endlessConsumer.exitStatus()) ++ ((OptionalAssert)assertThat(endlessConsumer.exitStatus())) + .describedAs("Consumer should have exited abnormally") + .containsInstanceOf(RecordDeserializationException.class); } @@@ -271,13 -281,12 +282,19 @@@ @Import(ApplicationConfiguration.class) public static class Configuration { + @Primary + @Bean + public Consumer> testHandler() + { + return new RecordHandler(); + } + + @Bean + Serializer serializer() + { + return new LongSerializer(); + } + @Bean KafkaProducer kafkaProducer(ApplicationProperties properties) {