From 0c9a0c1cf9a0065012743efcd940d8721bc33c20 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 10 Sep 2022 18:04:04 +0200 Subject: [PATCH] `EndlessConsumer` auf `@KafkaHandler` umgestellt MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Mit `@KafkaHandler` können separate Handler-Methoden für die unterschiedlichen Nachrichten-Typen definiert werden, die die Anwendung empfängt (hier: über ein Topic, auch mögich: über verschiedene Topics). * Die Tests mussten an die geänderte Implementierung angepasst werden. --- .../juplo/kafka/ApplicationConfiguration.java | 4 +- .../kafka/ApplicationHealthIndicator.java | 2 +- .../juplo/kafka/ApplicationRecordHandler.java | 29 +++------ .../java/de/juplo/kafka/EndlessConsumer.java | 61 ++++++++++++++----- .../java/de/juplo/kafka/RecordHandler.java | 14 ++++- .../juplo/kafka/GenericApplicationTests.java | 11 ++-- .../de/juplo/kafka/TestRecordHandler.java | 44 +++++++++---- 7 files changed, 109 insertions(+), 56 deletions(-) diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index f8bf857..1755747 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -53,14 +53,14 @@ public class ApplicationConfiguration } @Bean - public EndlessConsumer endlessConsumer( + public EndlessConsumer endlessConsumer( RecordHandler recordHandler, ApplicationErrorHandler errorHandler, KafkaProperties kafkaProperties, KafkaListenerEndpointRegistry endpointRegistry) { return - new EndlessConsumer<>( + new EndlessConsumer( kafkaProperties.getClientId(), endpointRegistry, errorHandler, diff --git a/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java b/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java index 03a14c8..ab9782c 100644 --- a/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java +++ b/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java @@ -10,7 +10,7 @@ import org.springframework.stereotype.Component; @RequiredArgsConstructor public class ApplicationHealthIndicator implements HealthIndicator { - private final EndlessConsumer consumer; + private final EndlessConsumer consumer; @Override diff --git a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java index 2829157..f4d3671 100644 --- a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java +++ b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java @@ -2,7 +2,6 @@ package de.juplo.kafka; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRecord; import java.time.Duration; import java.util.HashMap; @@ -12,7 +11,7 @@ import java.util.Optional; @RequiredArgsConstructor @Slf4j -public class ApplicationRecordHandler implements RecordHandler +public class ApplicationRecordHandler implements RecordHandler { private final AdderResults results; private final Optional throttle; @@ -21,42 +20,34 @@ public class ApplicationRecordHandler implements RecordHandler private final Map state = new HashMap<>(); + @Override public void addNumber( + String topic, Integer partition, + Long offset, String user, MessageAddNumber message) { state.get(partition).addToSum(user, message.getNext()); + throttle(); } + @Override public void calculateSum( + String topic, Integer partition, + Long offset, String user, MessageCalculateSum message) { AdderResult result = state.get(partition).calculate(user); log.info("{} - New result for {}: {}", id, user, result); results.addResults(partition, user, result); + throttle(); } - @Override - public void accept(ConsumerRecord record) + private void throttle() { - Integer partition = record.partition(); - String user = record.key(); - Message message = record.value(); - - switch(message.getType()) - { - case ADD: - addNumber(partition, user, (MessageAddNumber) message); - break; - - case CALC: - calculateSum(partition, user, (MessageCalculateSum) message); - break; - } - if (throttle.isPresent()) { try diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index 01397a2..655151a 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -3,8 +3,12 @@ package de.juplo.kafka; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.kafka.annotation.KafkaHandler; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.config.KafkaListenerEndpointRegistry; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.messaging.handler.annotation.Payload; import java.util.List; import java.util.Optional; @@ -12,34 +16,63 @@ import java.util.Optional; @RequiredArgsConstructor @Slf4j -public class EndlessConsumer +@KafkaListener( + id = "${spring.kafka.client-id}", + idIsGroup = false, + topics = "${sumup.adder.topic}", + autoStartup = "false") +public class EndlessConsumer { private final String id; private final KafkaListenerEndpointRegistry registry; private final ApplicationErrorHandler errorHandler; - private final RecordHandler recordHandler; + private final RecordHandler recordHandler; private long consumed = 0; - @KafkaListener( - id = "${spring.kafka.client-id}", - idIsGroup = false, - topics = "${sumup.adder.topic}", - autoStartup = "false") - public void accept(ConsumerRecord record) + @KafkaHandler + public void addNumber( + @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, + @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, + @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition, + @Header(KafkaHeaders.OFFSET) Long offset, + @Payload MessageAddNumber message) { log.info( "{} - {}: {}/{} - {}={}", id, - record.offset(), - record.topic(), - record.partition(), - record.key(), - record.value() + offset, + topic, + partition, + key, + message ); - recordHandler.accept(record); + recordHandler.addNumber(topic, partition, offset, key, message); + + consumed++; + } + + @KafkaHandler + public void calculateSum( + @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, + @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, + @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition, + @Header(KafkaHeaders.OFFSET) Long offset, + @Payload MessageCalculateSum message) + { + log.info( + "{} - {}: {}/{} - {}={}", + id, + offset, + topic, + partition, + key, + message + ); + + recordHandler.calculateSum(topic, partition, offset, key, message); consumed++; } diff --git a/src/main/java/de/juplo/kafka/RecordHandler.java b/src/main/java/de/juplo/kafka/RecordHandler.java index 327ac9f..47f984e 100644 --- a/src/main/java/de/juplo/kafka/RecordHandler.java +++ b/src/main/java/de/juplo/kafka/RecordHandler.java @@ -5,6 +5,18 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import java.util.function.Consumer; -public interface RecordHandler extends Consumer> +public interface RecordHandler { + void addNumber( + String topic, + Integer partition, + Long offset, + String user, + MessageAddNumber message); + void calculateSum( + String topic, + Integer partition, + Long offset, + String user, + MessageCalculateSum message); } diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java index 124143c..49ddb47 100644 --- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java +++ b/src/test/java/de/juplo/kafka/GenericApplicationTests.java @@ -2,7 +2,6 @@ package de.juplo.kafka; import com.mongodb.client.MongoClient; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; @@ -69,9 +68,9 @@ abstract class GenericApplicationTests @Autowired KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry; @Autowired - TestRecordHandler recordHandler; + TestRecordHandler recordHandler; @Autowired - EndlessConsumer endlessConsumer; + EndlessConsumer endlessConsumer; KafkaProducer testRecordProducer; KafkaConsumer offsetConsumer; @@ -99,7 +98,7 @@ abstract class GenericApplicationTests await(numberOfGeneratedMessages + " records received") .atMost(Duration.ofSeconds(30)) .pollInterval(Duration.ofSeconds(1)) - .until(() -> recordHandler.receivedRecords.size() >= numberOfGeneratedMessages); + .until(() -> recordHandler.receivedMessages >= numberOfGeneratedMessages); await("Offsets committed") .atMost(Duration.ofSeconds(10)) @@ -141,7 +140,7 @@ abstract class GenericApplicationTests checkSeenOffsetsForProgress(); assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); - assertThat(recordHandler.receivedRecords.size()) + assertThat(recordHandler.receivedMessages) .describedAs("Received not all sent events") .isLessThan(numberOfGeneratedMessages); @@ -348,7 +347,7 @@ abstract class GenericApplicationTests oldOffsets = new HashMap<>(); recordHandler.seenOffsets = new HashMap<>(); - recordHandler.receivedRecords = new HashSet<>(); + recordHandler.receivedMessages = 0; doForCurrentOffsets((tp, offset) -> { diff --git a/src/test/java/de/juplo/kafka/TestRecordHandler.java b/src/test/java/de/juplo/kafka/TestRecordHandler.java index 37d3f65..d9f4e67 100644 --- a/src/test/java/de/juplo/kafka/TestRecordHandler.java +++ b/src/test/java/de/juplo/kafka/TestRecordHandler.java @@ -1,34 +1,52 @@ package de.juplo.kafka; import lombok.RequiredArgsConstructor; -import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; +import java.util.List; import java.util.Map; -import java.util.Set; @RequiredArgsConstructor -public class TestRecordHandler implements RecordHandler +public class TestRecordHandler implements RecordHandler { - private final RecordHandler handler; + private final RecordHandler handler; Map seenOffsets; - Set> receivedRecords; + int receivedMessages; - public void onNewRecord(ConsumerRecord record) + public void onNewRecord( + String topic, + Integer partition, + Long offset, + Message messgage) { - seenOffsets.put( - new TopicPartition(record.topic(), record.partition()), - record.offset()); - receivedRecords.add(record); + seenOffsets.put(new TopicPartition(topic, partition), offset); + receivedMessages++; } @Override - public void accept(ConsumerRecord record) + public void addNumber( + String topic, + Integer partition, + Long offset, + String user, + MessageAddNumber message) { - this.onNewRecord(record); - handler.accept(record); + this.onNewRecord(topic, partition, offset, message); + handler.addNumber(topic, partition, offset, user, message); + } + + @Override + public void calculateSum( + String topic, + Integer partition, + Long offset, + String user, + MessageCalculateSum message) + { + this.onNewRecord(topic, partition, offset, message); + handler.calculateSum(topic, partition, offset, user, message); } } -- 2.20.1