From a9200a876060edc8683dfd6d0d16c23407c189ad Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Wed, 13 Apr 2022 00:38:24 +0200 Subject: [PATCH] =?utf8?q?Springify:=20Kernfunktion=20von=20EndlessConsume?= =?utf8?q?r=20=C3=BCber=20Spring-Kafka?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Alle weiteren Funktionen für dieses erste Experiment erst mal entfernt * Testfall entsprechend angepasst * Der Commit passiert hier, weil Spring Kafka per Default den eignen Commit-Modus `BATCH` aktiviert, der nach jedem abgearbeiteten `poll()` einen (synchronen!) Commit durchführt * Der Test ist zwar grün, wenn man die App normal startet, verfängt sie sich jedoch in einer Endlosschleife, da kein Error-Handler konfiguriert ist, der die `RecordDeserializationException` korrekt behandeln kann, so dass sich die App in einem Life-Deadlock befindet, in dem sie immer wieder den Datensatz erhält, der den Fehler ausgelöst hat, dadurch aber nicht wie ein Vanilla-Consumer beendet wird, da Spring Kafka die Exception abfängt und weitermacht: neuer `poll()` für die selbe Position, so dass die App aus Sicht des GroupCoordinator noch lebt. * DEBUG-Logging für `org.springframework.kafka` aktiviert, damit man die Commits sieht. --- pom.xml | 4 + src/main/java/de/juplo/kafka/Application.java | 51 +-- .../juplo/kafka/ApplicationConfiguration.java | 44 --- .../kafka/ApplicationHealthIndicator.java | 9 +- .../java/de/juplo/kafka/DriverController.java | 8 +- .../java/de/juplo/kafka/EndlessConsumer.java | 302 ++---------------- src/main/resources/application.yml | 8 + src/main/resources/logback.xml | 1 + .../java/de/juplo/kafka/ApplicationTests.java | 73 ++--- 9 files changed, 76 insertions(+), 424 deletions(-) diff --git a/pom.xml b/pom.xml index f218085..21466ec 100644 --- a/pom.xml +++ b/pom.xml @@ -38,6 +38,10 @@ org.apache.kafka kafka-clients + + org.springframework.kafka + spring-kafka + org.projectlombok lombok diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java index 6601e6d..76ba717 100644 --- a/src/main/java/de/juplo/kafka/Application.java +++ b/src/main/java/de/juplo/kafka/Application.java @@ -1,63 +1,14 @@ package de.juplo.kafka; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.ApplicationArguments; -import org.springframework.boot.ApplicationRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; -import javax.annotation.PreDestroy; -import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; - @SpringBootApplication @Slf4j -public class Application implements ApplicationRunner +public class Application { - @Autowired - EndlessConsumer endlessConsumer; - @Autowired - ExecutorService executor; - - - @Override - public void run(ApplicationArguments args) throws Exception - { - log.info("Starting EndlessConsumer"); - endlessConsumer.start(); - } - - @PreDestroy - public void stopExecutor() - { - try - { - log.info("Shutting down the ExecutorService."); - executor.shutdown(); - log.info("Waiting 5 seconds for the ExecutorService to terminate..."); - executor.awaitTermination(5, TimeUnit.SECONDS); - } - catch (InterruptedException e) - { - log.error("Exception while waiting for the termination of the ExecutorService: {}", e.toString()); - } - finally - { - if (!executor.isShutdown()) - { - log.warn("Forcing shutdown of ExecutorService!"); - executor - .shutdownNow() - .forEach(runnable -> log.warn("Unfinished task: {}", runnable.getClass().getSimpleName())); - } - log.info("Shutdow of ExecutorService finished"); - } - } - - public static void main(String[] args) { SpringApplication.run(Application.class, args); diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 4054e93..5cefa32 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -1,16 +1,10 @@ package de.juplo.kafka; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.serialization.LongDeserializer; -import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import java.util.Properties; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.function.Consumer; @@ -26,42 +20,4 @@ public class ApplicationConfiguration // Handle record }; } - - @Bean - public EndlessConsumer endlessConsumer( - KafkaConsumer kafkaConsumer, - ExecutorService executor, - Consumer> handler, - ApplicationProperties properties) - { - return - new EndlessConsumer<>( - executor, - properties.getClientId(), - properties.getTopic(), - kafkaConsumer, - handler); - } - - @Bean - public ExecutorService executor() - { - return Executors.newSingleThreadExecutor(); - } - - @Bean(destroyMethod = "close") - public KafkaConsumer kafkaConsumer(ApplicationProperties properties) - { - Properties props = new Properties(); - - props.put("bootstrap.servers", properties.getBootstrapServer()); - props.put("group.id", properties.getGroupId()); - props.put("client.id", properties.getClientId()); - props.put("auto.offset.reset", properties.getAutoOffsetReset()); - props.put("metadata.max.age.ms", "1000"); - props.put("key.deserializer", StringDeserializer.class.getName()); - props.put("value.deserializer", LongDeserializer.class.getName()); - - return new KafkaConsumer<>(props); - } } diff --git a/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java b/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java index dc3a26e..0244a05 100644 --- a/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java +++ b/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java @@ -10,19 +10,12 @@ import org.springframework.stereotype.Component; @RequiredArgsConstructor public class ApplicationHealthIndicator implements HealthIndicator { - private final EndlessConsumer consumer; - - @Override public Health health() { try { - return consumer - .exitStatus() - .map(Health::down) - .orElse(Health.outOfService()) - .build(); + return Health.up().build(); } catch (IllegalStateException e) { diff --git a/src/main/java/de/juplo/kafka/DriverController.java b/src/main/java/de/juplo/kafka/DriverController.java index 93580ee..8ca3e2a 100644 --- a/src/main/java/de/juplo/kafka/DriverController.java +++ b/src/main/java/de/juplo/kafka/DriverController.java @@ -8,6 +8,7 @@ import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.ResponseStatus; import org.springframework.web.bind.annotation.RestController; +import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutionException; @@ -16,25 +17,20 @@ import java.util.concurrent.ExecutionException; @RequiredArgsConstructor public class DriverController { - private final EndlessConsumer consumer; - - @PostMapping("start") public void start() { - consumer.start(); } @PostMapping("stop") public void stop() throws ExecutionException, InterruptedException { - consumer.stop(); } @GetMapping("seen") public Map> seen() { - return consumer.getSeen(); + return new HashMap<>(); } @ExceptionHandler diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index b173b12..87780b4 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -2,285 +2,39 @@ package de.juplo.kafka; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.*; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.RecordDeserializationException; -import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Component; -import javax.annotation.PreDestroy; -import java.time.Duration; -import java.util.*; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; +@Component @Slf4j @RequiredArgsConstructor -public class EndlessConsumer implements Runnable +public class EndlessConsumer { - private final ExecutorService executor; - private final String id; - private final String topic; - private final Consumer consumer; - private final java.util.function.Consumer> handler; - - private final Lock lock = new ReentrantLock(); - private final Condition condition = lock.newCondition(); - private boolean running = false; - private Exception exception; - private long consumed = 0; - - private final Map> seen = new HashMap<>(); - private final Map offsets = new HashMap<>(); - - - @Override - public void run() - { - try - { - log.info("{} - Subscribing to topic {}", id, topic); - consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() - { - @Override - public void onPartitionsRevoked(Collection partitions) - { - partitions.forEach(tp -> - { - Integer partition = tp.partition(); - Long newOffset = consumer.position(tp); - Long oldOffset = offsets.remove(partition); - log.info( - "{} - removing partition: {}, consumed {} records (offset {} -> {})", - id, - partition, - newOffset - oldOffset, - oldOffset, - newOffset); - Map removed = seen.remove(partition); - for (String key : removed.keySet()) - { - log.info( - "{} - Seen {} messages for partition={}|key={}", - id, - removed.get(key), - partition, - key); - } - }); - } - - @Override - public void onPartitionsAssigned(Collection partitions) - { - partitions.forEach(tp -> - { - Integer partition = tp.partition(); - Long offset = consumer.position(tp); - log.info("{} - adding partition: {}, offset={}", id, partition, offset); - offsets.put(partition, offset); - seen.put(partition, new HashMap<>()); - }); - } - }); - - while (true) - { - ConsumerRecords records = - consumer.poll(Duration.ofSeconds(1)); - - // Do something with the data... - log.info("{} - Received {} messages", id, records.count()); - for (ConsumerRecord record : records) - { - log.info( - "{} - {}: {}/{} - {}={}", - id, - record.offset(), - record.topic(), - record.partition(), - record.key(), - record.value() - ); - - handler.accept(record); - - consumed++; - - Integer partition = record.partition(); - String key = record.key() == null ? "NULL" : record.key().toString(); - Map byKey = seen.get(partition); - - if (!byKey.containsKey(key)) - byKey.put(key, 0l); - - long seenByKey = byKey.get(key); - seenByKey++; - byKey.put(key, seenByKey); - } - } - } - catch(WakeupException e) - { - log.info("{} - RIIING! Request to stop consumption - commiting current offsets!", id); - consumer.commitSync(); - shutdown(); - } - catch(RecordDeserializationException e) - { - TopicPartition tp = e.topicPartition(); - long offset = e.offset(); - log.error( - "{} - Could not deserialize message on topic {} with offset={}: {}", - id, - tp, - offset, - e.getCause().toString()); - - consumer.commitSync(); - shutdown(e); - } - catch(Exception e) - { - log.error("{} - Unexpected error: {}", id, e.toString(), e); - shutdown(e); - } - finally - { - log.info("{} - Consumer-Thread exiting", id); - } - } - - private void shutdown() - { - shutdown(null); - } - - private void shutdown(Exception e) - { - lock.lock(); - try - { - try - { - log.info("{} - Unsubscribing from topic {}", id, topic); - consumer.unsubscribe(); - } - catch (Exception ue) - { - log.error( - "{} - Error while unsubscribing from topic {}: {}", - id, - topic, - ue.toString()); - } - finally - { - running = false; - exception = e; - condition.signal(); - } - } - finally - { - lock.unlock(); - } - } - - public Map> getSeen() - { - return seen; - } - - public void start() - { - lock.lock(); - try - { - if (running) - throw new IllegalStateException("Consumer instance " + id + " is already running!"); - - log.info("{} - Starting - consumed {} messages before", id, consumed); - running = true; - exception = null; - executor.submit(this); - } - finally - { - lock.unlock(); - } - } - - public synchronized void stop() throws ExecutionException, InterruptedException - { - lock.lock(); - try - { - if (!running) - throw new IllegalStateException("Consumer instance " + id + " is not running!"); - - log.info("{} - Stopping", id); - consumer.wakeup(); - condition.await(); - log.info("{} - Stopped - consumed {} messages so far", id, consumed); - } - finally - { - lock.unlock(); - } - } - - @PreDestroy - public void destroy() throws ExecutionException, InterruptedException - { - log.info("{} - Destroy!", id); - try - { - stop(); - } - catch (IllegalStateException e) - { - log.info("{} - Was already stopped", id); - } - catch (Exception e) - { - log.error("{} - Unexpected exception while trying to stop the consumer", id, e); - } - finally - { - log.info("{}: Consumed {} messages in total, exiting!", id, consumed); - } - } - - public boolean running() - { - lock.lock(); - try - { - return running; - } - finally - { - lock.unlock(); - } - } - - public Optional exitStatus() - { - lock.lock(); - try - { - if (running) - throw new IllegalStateException("No exit-status available: Consumer instance " + id + " is running!"); - - return Optional.ofNullable(exception); - } - finally - { - lock.unlock(); - } + @Value("${consumer.client-id}") + String id; + @Autowired + Consumer> handler; + + + @KafkaListener(topics = "${consumer.topic}") + public void receive(ConsumerRecord record) + { + log.info( + "{} - {}: {}/{} - {}={}", + id, + record.offset(), + record.topic(), + record.partition(), + record.key(), + record.value() + ); + + handler.accept(record); } } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 9f3cb81..1cb6212 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -24,6 +24,14 @@ info: group-id: ${consumer.group-id} topic: ${consumer.topic} auto-offset-reset: ${consumer.auto-offset-reset} +spring: + kafka: + consumer: + bootstrap-servers: ${consumer.bootstrap-server} + client-id: ${consumer.client-id} + auto-offset-reset: ${consumer.auto-offset-reset} + group-id: ${consumer.group-id} + value-deserializer: org.apache.kafka.common.serialization.LongDeserializer logging: level: root: INFO diff --git a/src/main/resources/logback.xml b/src/main/resources/logback.xml index b8e6780..c41e588 100644 --- a/src/main/resources/logback.xml +++ b/src/main/resources/logback.xml @@ -9,6 +9,7 @@ + diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 6f58180..cbf215e 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -13,10 +13,12 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Bytes; import org.junit.jupiter.api.*; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration; import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer; import org.springframework.boot.test.context.TestConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Import; +import org.springframework.context.annotation.Primary; import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.test.context.TestPropertySource; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; @@ -24,7 +26,6 @@ import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; import java.time.Duration; import java.util.*; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; @@ -37,7 +38,12 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.*; -@SpringJUnitConfig(initializers = ConfigDataApplicationContextInitializer.class) +@SpringJUnitConfig( + initializers = ConfigDataApplicationContextInitializer.class, + classes = { + EndlessConsumer.class, + KafkaAutoConfiguration.class, + ApplicationTests.Configuration.class }) @TestMethodOrder(MethodOrderer.OrderAnnotation.class) @TestPropertySource( properties = { @@ -57,16 +63,12 @@ class ApplicationTests @Autowired KafkaProducer kafkaProducer; @Autowired - KafkaConsumer kafkaConsumer; - @Autowired KafkaConsumer offsetConsumer; @Autowired ApplicationProperties properties; @Autowired - ExecutorService executor; + RecordHandler recordHandler; - Consumer> testHandler; - EndlessConsumer endlessConsumer; Map oldOffsets; Map newOffsets; @@ -80,7 +82,7 @@ class ApplicationTests send100Messages(i -> new Bytes(longSerializer.serialize(TOPIC, i))); Set> received = new HashSet<>(); - testHandler = record -> received.add(record); + recordHandler.testHandler = record -> received.add(record); await("100 records received") .atMost(Duration.ofSeconds(30)) @@ -106,17 +108,8 @@ class ApplicationTests await("Consumer failed") .atMost(Duration.ofSeconds(30)) - .until(() -> !endlessConsumer.running()); + .untilAsserted(() -> checkSeenOffsetsForProgress()); - checkSeenOffsetsForProgress(); - compareToCommitedOffsets(newOffsets); - - endlessConsumer.start(); - await("Consumer failed") - .atMost(Duration.ofSeconds(30)) - .until(() -> !endlessConsumer.running()); - - checkSeenOffsetsForProgress(); compareToCommitedOffsets(newOffsets); } @@ -215,7 +208,7 @@ class ApplicationTests @BeforeEach public void init() { - testHandler = record -> {} ; + recordHandler.testHandler = (record) -> {}; oldOffsets = new HashMap<>(); newOffsets = new HashMap<>(); @@ -226,44 +219,40 @@ class ApplicationTests newOffsets.put(tp, offset - 1); }); - Consumer> captureOffsetAndExecuteTestHandler = + recordHandler.captureOffsets = record -> - { newOffsets.put( new TopicPartition(record.topic(), record.partition()), record.offset()); - testHandler.accept(record); - }; - - endlessConsumer = - new EndlessConsumer<>( - executor, - properties.getClientId(), - properties.getTopic(), - kafkaConsumer, - captureOffsetAndExecuteTestHandler); - - endlessConsumer.start(); } - @AfterEach - public void deinit() + + public static class RecordHandler implements Consumer> { - try - { - endlessConsumer.stop(); - } - catch (Exception e) + Consumer> captureOffsets; + Consumer> testHandler; + + + @Override + public void accept(ConsumerRecord record) { - log.info("Exception while stopping the consumer: {}", e.toString()); + captureOffsets + .andThen(testHandler) + .accept(record); } } - @TestConfiguration @Import(ApplicationConfiguration.class) public static class Configuration { + @Primary + @Bean + public Consumer> testHandler() + { + return new RecordHandler(); + } + @Bean KafkaProducer kafkaProducer(ApplicationProperties properties) { -- 2.20.1