From: Kai Moritz Date: Sun, 11 Sep 2022 17:58:02 +0000 (+0200) Subject: Vereinfachte Version der auf Spring Kafka basierenden Implementierung X-Git-Tag: sumup-adder--springified---lvm-2-tage X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=refs%2Fheads%2Fsumup-adder--springified;p=demos%2Fkafka%2Ftraining Vereinfachte Version der auf Spring Kafka basierenden Implementierung --- diff --git a/docker-compose.yml b/docker-compose.yml index 9850ce3..a3da553 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -139,7 +139,7 @@ services: sumup.adder.throttle: 3ms spring.data.mongodb.uri: mongodb://juplo:training@mongo:27017 spring.data.mongodb.database: juplo - logging.level.org.apache.kafka.clients.consumer: DEBUG + logging.level.org.apache.kafka.clients.consumer: INFO adder-2: image: juplo/sumup-adder-springified:1.0-SNAPSHOT @@ -154,7 +154,7 @@ services: sumup.adder.throttle: 3ms spring.data.mongodb.uri: mongodb://juplo:training@mongo:27017 spring.data.mongodb.database: juplo - logging.level.org.apache.kafka.clients.consumer: DEBUG + logging.level.org.apache.kafka.clients.consumer: INFO peter: image: juplo/toolbox diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java index a4d9aeb..69a9712 100644 --- a/src/main/java/de/juplo/kafka/Application.java +++ b/src/main/java/de/juplo/kafka/Application.java @@ -1,46 +1,112 @@ package de.juplo.kafka; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.ApplicationArguments; -import org.springframework.boot.ApplicationRunner; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.config.KafkaListenerEndpointRegistry; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaOperations; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.listener.DeadLetterPublishingRecoverer; +import org.springframework.kafka.listener.DefaultErrorHandler; +import org.springframework.kafka.support.serializer.DelegatingByTypeSerializer; +import org.springframework.kafka.support.serializer.JsonSerializer; +import org.springframework.util.backoff.FixedBackOff; -import javax.annotation.PreDestroy; +import java.util.Map; +import java.util.Optional; @SpringBootApplication @Slf4j -public class Application implements ApplicationRunner +@EnableConfigurationProperties({ KafkaProperties.class, ApplicationProperties.class }) +@EnableKafka +public class Application { - @Autowired - EndlessConsumer endlessConsumer; + @Bean + public ApplicationRecordHandler applicationRecordHandler( + AdderResults adderResults, + KafkaProperties kafkaProperties, + ApplicationProperties applicationProperties) + { + return new ApplicationRecordHandler( + adderResults, + Optional.ofNullable(applicationProperties.getThrottle()), + kafkaProperties.getConsumer().getGroupId()); + } + + @Bean + public AdderResults adderResults() + { + return new AdderResults(); + } + + @Bean + public ApplicationRebalanceListener rebalanceListener( + ApplicationRecordHandler recordHandler, + AdderResults adderResults, + StateRepository stateRepository, + KafkaProperties kafkaProperties) + { + return new ApplicationRebalanceListener( + recordHandler, + adderResults, + stateRepository, + kafkaProperties.getConsumer().getGroupId()); + } + @Bean + ApplicationHealthIndicator applicationHealthIndicator( + KafkaListenerEndpointRegistry registry, + KafkaProperties properties) + { + return new ApplicationHealthIndicator( + properties.getConsumer().getGroupId(), + registry); + } + + @Bean + public ProducerFactory producerFactory( + KafkaProperties properties) + { + return new DefaultKafkaProducerFactory<>( + properties.getProducer().buildProperties(), + new StringSerializer(), + new DelegatingByTypeSerializer( + Map.of( + byte[].class, new ByteArraySerializer(), + MessageAddNumber.class, new JsonSerializer<>(), + MessageCalculateSum.class, new JsonSerializer<>()))); + } + + @Bean + public KafkaTemplate kafkaTemplate( + ProducerFactory producerFactory) + { + return new KafkaTemplate<>(producerFactory); + } - @Override - public void run(ApplicationArguments args) throws Exception + @Bean + public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer( + KafkaOperations kafkaTemplate) { - log.info("Starting EndlessConsumer"); - endlessConsumer.start(); + return new DeadLetterPublishingRecoverer(kafkaTemplate); } - @PreDestroy - public void shutdown() + @Bean + public DefaultErrorHandler errorHandler( + DeadLetterPublishingRecoverer recoverer) { - try - { - log.info("Stopping EndlessConsumer"); - endlessConsumer.stop(); - } - catch (IllegalStateException e) - { - log.info("Was already stopped: {}", e.toString()); - } - catch (Exception e) - { - log.error("Unexpected exception while stopping EndlessConsumer: {}", e); - } + return new DefaultErrorHandler( + recoverer, + new FixedBackOff(0l, 0l)); } diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java deleted file mode 100644 index b5f6187..0000000 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ /dev/null @@ -1,110 +0,0 @@ -package de.juplo.kafka; - -import org.apache.kafka.common.serialization.ByteArraySerializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.springframework.boot.autoconfigure.kafka.KafkaProperties; -import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; - -import java.util.Map; -import java.util.Optional; - -import org.springframework.kafka.config.KafkaListenerEndpointRegistry; -import org.springframework.kafka.core.DefaultKafkaProducerFactory; -import org.springframework.kafka.core.KafkaOperations; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.kafka.core.ProducerFactory; -import org.springframework.kafka.listener.DeadLetterPublishingRecoverer; -import org.springframework.kafka.listener.DefaultErrorHandler; -import org.springframework.kafka.support.serializer.DelegatingByTypeSerializer; -import org.springframework.kafka.support.serializer.JsonSerializer; -import org.springframework.util.backoff.FixedBackOff; - - -@Configuration -@EnableConfigurationProperties({ KafkaProperties.class, ApplicationProperties.class }) -public class ApplicationConfiguration -{ - @Bean - public ApplicationRecordHandler applicationRecordHandler( - AdderResults adderResults, - KafkaProperties kafkaProperties, - ApplicationProperties applicationProperties) - { - return new ApplicationRecordHandler( - adderResults, - Optional.ofNullable(applicationProperties.getThrottle()), - kafkaProperties.getClientId()); - } - - @Bean - public AdderResults adderResults() - { - return new AdderResults(); - } - - @Bean - public ApplicationRebalanceListener rebalanceListener( - ApplicationRecordHandler recordHandler, - AdderResults adderResults, - StateRepository stateRepository, - KafkaProperties kafkaProperties) - { - return new ApplicationRebalanceListener( - recordHandler, - adderResults, - stateRepository, - kafkaProperties.getClientId()); - } - - @Bean - public EndlessConsumer endlessConsumer( - RecordHandler recordHandler, - KafkaProperties kafkaProperties, - KafkaListenerEndpointRegistry endpointRegistry) - { - return - new EndlessConsumer( - kafkaProperties.getClientId(), - endpointRegistry, - recordHandler); - } - - @Bean - public ProducerFactory producerFactory( - KafkaProperties properties) - { - return new DefaultKafkaProducerFactory<>( - properties.getProducer().buildProperties(), - new StringSerializer(), - new DelegatingByTypeSerializer( - Map.of( - byte[].class, new ByteArraySerializer(), - MessageAddNumber.class, new JsonSerializer<>(), - MessageCalculateSum.class, new JsonSerializer<>()))); - } - - @Bean - public KafkaTemplate kafkaTemplate( - ProducerFactory producerFactory) - { - return new KafkaTemplate<>(producerFactory); - } - - @Bean - public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer( - KafkaOperations kafkaTemplate) - { - return new DeadLetterPublishingRecoverer(kafkaTemplate); - } - - @Bean - public DefaultErrorHandler errorHandler( - DeadLetterPublishingRecoverer recoverer) - { - return new DefaultErrorHandler( - recoverer, - new FixedBackOff(0l, 0l)); - } -} diff --git a/src/main/java/de/juplo/kafka/ApplicationController.java b/src/main/java/de/juplo/kafka/ApplicationController.java new file mode 100644 index 0000000..0a9890c --- /dev/null +++ b/src/main/java/de/juplo/kafka/ApplicationController.java @@ -0,0 +1,39 @@ +package de.juplo.kafka; + +import lombok.RequiredArgsConstructor; +import org.springframework.data.mongodb.core.aggregation.ArithmeticOperators; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.*; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + + +@RestController +@RequiredArgsConstructor +public class ApplicationController +{ + private final AdderResults results; + + + @GetMapping("results") + public Map>> results() + { + return results.getState(); + } + + @GetMapping("results/{user}") + public ResponseEntity> results(@PathVariable String user) + { + for (Map> resultsByUser : this.results.getState().values()) + { + List results = resultsByUser.get(user); + if (results != null) + return ResponseEntity.ok(results); + } + + return ResponseEntity.notFound().build(); + } +} diff --git a/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java b/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java index e215c69..0466df4 100644 --- a/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java +++ b/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java @@ -3,20 +3,20 @@ package de.juplo.kafka; import lombok.RequiredArgsConstructor; import org.springframework.boot.actuate.health.Health; import org.springframework.boot.actuate.health.HealthIndicator; -import org.springframework.stereotype.Component; +import org.springframework.kafka.config.KafkaListenerEndpointRegistry; -@Component @RequiredArgsConstructor public class ApplicationHealthIndicator implements HealthIndicator { - private final EndlessConsumer consumer; + private final String id; + private final KafkaListenerEndpointRegistry registry; @Override public Health health() { - return consumer.running() + return registry.getListenerContainer(id).isRunning() ? Health.up().build() : Health.down().build(); } diff --git a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java index f4d3671..2075781 100644 --- a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java +++ b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java @@ -2,6 +2,11 @@ package de.juplo.kafka; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.springframework.kafka.annotation.KafkaHandler; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.messaging.handler.annotation.Payload; import java.time.Duration; import java.util.HashMap; @@ -11,7 +16,10 @@ import java.util.Optional; @RequiredArgsConstructor @Slf4j -public class ApplicationRecordHandler implements RecordHandler +@KafkaListener( + id = "${spring.kafka.consumer.group-id}", + topics = "${sumup.adder.topic}") +public class ApplicationRecordHandler { private final AdderResults results; private final Optional throttle; @@ -20,24 +28,27 @@ public class ApplicationRecordHandler implements RecordHandler private final Map state = new HashMap<>(); - @Override + @KafkaHandler public void addNumber( - String topic, + @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition, - Long offset, + @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String user, + @Payload MessageAddNumber message) { + log.debug("{} - Received {} for {} on {}", id, message, user, partition); state.get(partition).addToSum(user, message.getNext()); throttle(); } - @Override + @KafkaHandler public void calculateSum( - String topic, + @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition, - Long offset, + @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String user, + @Payload MessageCalculateSum message) { AdderResult result = state.get(partition).calculate(user); @@ -70,15 +81,4 @@ public class ApplicationRecordHandler implements RecordHandler { return this.state.remove(partition).getState(); } - - - public Map getState() - { - return state; - } - - public AdderBusinessLogic getState(Integer partition) - { - return state.get(partition); - } } diff --git a/src/main/java/de/juplo/kafka/DriverController.java b/src/main/java/de/juplo/kafka/DriverController.java deleted file mode 100644 index 26a5bc8..0000000 --- a/src/main/java/de/juplo/kafka/DriverController.java +++ /dev/null @@ -1,89 +0,0 @@ -package de.juplo.kafka; - -import lombok.RequiredArgsConstructor; -import org.springframework.http.HttpStatus; -import org.springframework.http.ResponseEntity; -import org.springframework.web.bind.annotation.*; - -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.concurrent.ExecutionException; -import java.util.stream.Collectors; - - -@RestController -@RequiredArgsConstructor -public class DriverController -{ - private final EndlessConsumer consumer; - private final ApplicationRecordHandler recordHandler; - private final AdderResults results; - - - @PostMapping("start") - public void start() - { - consumer.start(); - } - - @PostMapping("stop") - public void stop() throws ExecutionException, InterruptedException - { - consumer.stop(); - } - - - @GetMapping("state") - public Map> state() - { - return - recordHandler - .getState() - .entrySet() - .stream() - .collect(Collectors.toMap( - entry -> entry.getKey(), - entry -> entry.getValue().getState())); - } - - @GetMapping("state/{user}") - public ResponseEntity state(@PathVariable String user) - { - for (AdderBusinessLogic adder : recordHandler.getState().values()) - { - Optional sum = adder.getSum(user); - if (sum.isPresent()) - return ResponseEntity.ok(sum.get()); - } - - return ResponseEntity.notFound().build(); - } - - @GetMapping("results") - public Map>> results() - { - return results.getState(); - } - - @GetMapping("results/{user}") - public ResponseEntity> results(@PathVariable String user) - { - for (Map> resultsByUser : this.results.getState().values()) - { - List results = resultsByUser.get(user); - if (results != null) - return ResponseEntity.ok(results); - } - - return ResponseEntity.notFound().build(); - } - - - @ExceptionHandler - @ResponseStatus(HttpStatus.BAD_REQUEST) - public ErrorResponse illegalStateException(IllegalStateException e) - { - return new ErrorResponse(e.getMessage(), HttpStatus.BAD_REQUEST.value()); - } -} diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java deleted file mode 100644 index 27c1e44..0000000 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ /dev/null @@ -1,102 +0,0 @@ -package de.juplo.kafka; - -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.springframework.kafka.annotation.KafkaHandler; -import org.springframework.kafka.annotation.KafkaListener; -import org.springframework.kafka.config.KafkaListenerEndpointRegistry; -import org.springframework.kafka.support.KafkaHeaders; -import org.springframework.messaging.handler.annotation.Header; -import org.springframework.messaging.handler.annotation.Payload; - -import java.util.List; -import java.util.Optional; - - -@RequiredArgsConstructor -@Slf4j -@KafkaListener( - id = "${spring.kafka.client-id}", - idIsGroup = false, - topics = "${sumup.adder.topic}", - autoStartup = "false") -public class EndlessConsumer -{ - private final String id; - private final KafkaListenerEndpointRegistry registry; - private final RecordHandler recordHandler; - - private long consumed = 0; - - - @KafkaHandler - public void addNumber( - @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, - @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, - @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition, - @Header(KafkaHeaders.OFFSET) Long offset, - @Payload MessageAddNumber message) - { - log.info( - "{} - {}: {}/{} - {}={}", - id, - offset, - topic, - partition, - key, - message - ); - - recordHandler.addNumber(topic, partition, offset, key, message); - - consumed++; - } - - @KafkaHandler - public void calculateSum( - @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, - @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, - @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition, - @Header(KafkaHeaders.OFFSET) Long offset, - @Payload MessageCalculateSum message) - { - log.info( - "{} - {}: {}/{} - {}={}", - id, - offset, - topic, - partition, - key, - message - ); - - recordHandler.calculateSum(topic, partition, offset, key, message); - - consumed++; - } - - public void start() - { - if (running()) - throw new IllegalStateException("Consumer instance " + id + " is already running!"); - - log.info("{} - Starting - consumed {} messages before", id, consumed); - registry.getListenerContainer(id).start(); - } - - public void stop() - { - if (!running()) - throw new IllegalStateException("Consumer instance " + id + " is not running!"); - - log.info("{} - Stopping", id); - registry.getListenerContainer(id).stop(); - log.info("{} - Stopped - consumed {} messages so far", id, consumed); - } - - public boolean running() - { - return registry.getListenerContainer(id).isRunning(); - } -} diff --git a/src/main/java/de/juplo/kafka/ErrorResponse.java b/src/main/java/de/juplo/kafka/ErrorResponse.java deleted file mode 100644 index 5ca206d..0000000 --- a/src/main/java/de/juplo/kafka/ErrorResponse.java +++ /dev/null @@ -1,11 +0,0 @@ -package de.juplo.kafka; - -import lombok.Value; - - -@Value -public class ErrorResponse -{ - private final String error; - private final Integer status; -} diff --git a/src/main/java/de/juplo/kafka/RecordHandler.java b/src/main/java/de/juplo/kafka/RecordHandler.java deleted file mode 100644 index 47f984e..0000000 --- a/src/main/java/de/juplo/kafka/RecordHandler.java +++ /dev/null @@ -1,22 +0,0 @@ -package de.juplo.kafka; - -import org.apache.kafka.clients.consumer.ConsumerRecord; - -import java.util.function.Consumer; - - -public interface RecordHandler -{ - void addNumber( - String topic, - Integer partition, - Long offset, - String user, - MessageAddNumber message); - void calculateSum( - String topic, - Integer partition, - Long offset, - String user, - MessageCalculateSum message); -} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 2f6f859..a95e976 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -14,13 +14,6 @@ management: enabled: true java: enabled: true -info: - kafka: - bootstrap-server: ${spring.kafka.consumer.bootstrap-servers} - client-id: ${spring.kafka.consumer.client-id} - group-id: ${spring.kafka.consumer.group-id} - topic: ${consumer.topic} - auto-offset-reset: ${spring.kafka.consumer.auto-offset-reset} spring: data: mongodb: @@ -28,11 +21,8 @@ spring: database: juplo kafka: bootstrap-servers: :9092 - client-id: DEV consumer: group-id: my-group - auto-offset-reset: earliest - auto-commit-interval: 5s key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer properties: diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java deleted file mode 100644 index e01fdd1..0000000 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ /dev/null @@ -1,195 +0,0 @@ -package de.juplo.kafka; - -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.common.utils.Bytes; -import org.springframework.beans.factory.annotation.Autowired; - -import java.util.*; -import java.util.function.Consumer; -import java.util.stream.Collectors; -import java.util.stream.IntStream; - -import static org.assertj.core.api.Assertions.assertThat; - - -@Slf4j -public class ApplicationTests extends GenericApplicationTests -{ - @Autowired - StateRepository stateRepository; - - - public ApplicationTests() - { - super(new ApplicationTestRecrodGenerator()); - ((ApplicationTestRecrodGenerator)recordGenerator).tests = this; - } - - - static class ApplicationTestRecrodGenerator implements RecordGenerator - { - ApplicationTests tests; - - final int[] numbers = {1, 77, 33, 2, 66, 666, 11}; - final String[] dieWilden13 = - IntStream - .range(1, 14) - .mapToObj(i -> "seeräuber-" + i) - .toArray(i -> new String[i]); - final StringSerializer stringSerializer = new StringSerializer(); - final Bytes calculateMessage = new Bytes(stringSerializer.serialize(TOPIC, "{}")); - - int counterMessages; - int counterPoisonPills; - int counterLogicErrors; - - Map> state; - - @Override - public void generate( - boolean poisonPills, - boolean logicErrors, - Consumer> messageSender) - { - counterMessages = 0; - counterPoisonPills = 0; - counterLogicErrors = 0; - - state = - Arrays - .stream(dieWilden13) - .collect(Collectors.toMap( - seeräuber -> seeräuber, - seeräuber -> new LinkedList())); - - int number[] = { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 }; - int message[] = { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 }; - int next = 0; - - for (int pass = 0; pass < 333; pass++) - { - for (int i = 0; i<13; i++) - { - String seeräuber = dieWilden13[i]; - Bytes key = new Bytes(stringSerializer.serialize(TOPIC, seeräuber)); - - if (message[i] > number[i]) - { - send( - key, - calculateMessage, - Message.Type.CALC, - poisonPill(poisonPills, pass, counterMessages), - logicError(logicErrors, pass, counterMessages), - messageSender); - state.get(seeräuber).add(new AdderResult(number[i], (number[i] + 1) * number[i] / 2)); - // Pick next number to calculate - number[i] = numbers[next++%numbers.length]; - message[i] = 1; - log.debug("Seeräuber {} will die Summe für {} berechnen", seeräuber, number[i]); - } - - send( - key, - new Bytes(stringSerializer.serialize(TOPIC, "{\"next\":" + message[i]++ + "}")), - Message.Type.ADD, - poisonPill(poisonPills, pass, counterMessages), - logicError(logicErrors, pass, counterMessages), - messageSender); - } - } - } - - @Override - public int getNumberOfMessages() - { - return counterMessages; - } - - @Override - public int getNumberOfPoisonPills() - { - return counterPoisonPills; - } - - @Override - public int getNumberOfLogicErrors() - { - return counterLogicErrors; - } - - boolean poisonPill (boolean poisonPills, int pass, int counter) - { - return poisonPills && pass > 300 && counter%99 == 0; - } - - boolean logicError(boolean logicErrors, int pass, int counter) - { - return logicErrors && pass > 300 && counter%77 == 0; - } - - void send( - Bytes key, - Bytes value, - Message.Type type, - boolean poisonPill, - boolean logicError, - Consumer> messageSender) - { - counterMessages++; - - if (logicError) - { - value = new Bytes(stringSerializer.serialize(TOPIC, "{\"next\":-1}")); - counterLogicErrors++; - } - if (poisonPill) - { - value = new Bytes("BOOM!".getBytes()); - counterPoisonPills++; - } - - ProducerRecord record = new ProducerRecord<>(TOPIC, key, value); - record.headers().add("__TypeId__", type.toString().getBytes()); - messageSender.accept(record); - } - - @Override - public void assertBusinessLogic() - { - for (int i=0; i - { - String user = entry.getKey(); - List resultsForUser = entry.getValue(); - - for (int j=0; j < resultsForUser.size(); j++) - { - if (!(j < state.get(user).size())) - { - break; - } - - assertThat(resultsForUser.get(j)) - .as("Unexpected results calculation %d of user %s", j, user) - .isEqualTo(state.get(user).get(j)); - } - - assertThat(state.get(user)) - .as("More results calculated for user %s as expected", user) - .containsAll(resultsForUser); - }); - } - } - } -} diff --git a/src/test/java/de/juplo/kafka/DeadLetterTopicConsumer.java b/src/test/java/de/juplo/kafka/DeadLetterTopicConsumer.java deleted file mode 100644 index ac8c65d..0000000 --- a/src/test/java/de/juplo/kafka/DeadLetterTopicConsumer.java +++ /dev/null @@ -1,40 +0,0 @@ -package de.juplo.kafka; - -import lombok.extern.slf4j.Slf4j; -import org.springframework.kafka.annotation.KafkaListener; -import org.springframework.kafka.support.KafkaHeaders; -import org.springframework.messaging.Message; - -import java.nio.ByteBuffer; -import java.util.LinkedList; -import java.util.List; - - -@Slf4j -public class DeadLetterTopicConsumer -{ - List> messages = new LinkedList<>(); - - - @KafkaListener( - id = "DLT", - topics = "${sumup.adder.topic}.DLT", - containerFactory = "dltContainerFactory") - public void receive(Message message) - { - log.info( - "Received dead letter on {}-{}|{},{} for {}-{}|{},{}: {}, exception={}", - message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC), - message.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID), - message.getHeaders().get(KafkaHeaders.OFFSET), - message.getHeaders().get(KafkaHeaders.RECEIVED_MESSAGE_KEY), - new String(message.getHeaders().get(KafkaHeaders.DLT_ORIGINAL_TOPIC, byte[].class)), - ByteBuffer.wrap(message.getHeaders().get(KafkaHeaders.DLT_ORIGINAL_PARTITION, byte[].class)).getInt(), - ByteBuffer.wrap(message.getHeaders().get(KafkaHeaders.DLT_ORIGINAL_OFFSET, byte[].class)).getLong(), - message.getHeaders().get(KafkaHeaders.MESSAGE_KEY), - message.getPayload(), - new String(message.getHeaders().get(KafkaHeaders.DLT_EXCEPTION_MESSAGE, byte[].class))); - - messages.add(message); - } -} diff --git a/src/test/java/de/juplo/kafka/ErrorCannotBeGeneratedCondition.java b/src/test/java/de/juplo/kafka/ErrorCannotBeGeneratedCondition.java deleted file mode 100644 index 606218f..0000000 --- a/src/test/java/de/juplo/kafka/ErrorCannotBeGeneratedCondition.java +++ /dev/null @@ -1,60 +0,0 @@ -package de.juplo.kafka; - -import org.junit.jupiter.api.extension.ConditionEvaluationResult; -import org.junit.jupiter.api.extension.ExecutionCondition; -import org.junit.jupiter.api.extension.ExtensionContext; -import org.junit.platform.commons.util.AnnotationUtils; - -import java.util.LinkedList; -import java.util.List; -import java.util.Optional; -import java.util.stream.Collectors; - - -public class ErrorCannotBeGeneratedCondition implements ExecutionCondition -{ - @Override - public ConditionEvaluationResult evaluateExecutionCondition(ExtensionContext context) - { - final Optional optional = - AnnotationUtils.findAnnotation( - context.getElement(), - SkipWhenErrorCannotBeGenerated.class); - - if (context.getTestInstance().isEmpty()) - return ConditionEvaluationResult.enabled("Test-instance ist not available"); - - if (optional.isPresent()) - { - SkipWhenErrorCannotBeGenerated skipWhenErrorCannotBeGenerated = optional.get(); - GenericApplicationTests instance = (GenericApplicationTests)context.getTestInstance().get(); - List missingRequiredErrors = new LinkedList<>(); - - if (skipWhenErrorCannotBeGenerated.poisonPill() && !instance.recordGenerator.canGeneratePoisonPill()) - missingRequiredErrors.add("Poison-Pill"); - - if (skipWhenErrorCannotBeGenerated.logicError() && !instance.recordGenerator.canGenerateLogicError()) - missingRequiredErrors.add("Logic-Error"); - - StringBuilder builder = new StringBuilder(); - builder.append(context.getTestClass().get().getSimpleName()); - - if (missingRequiredErrors.isEmpty()) - { - builder.append(" can generate all required types of errors"); - return ConditionEvaluationResult.enabled(builder.toString()); - } - - builder.append(" cannot generate the required error(s): "); - builder.append( - missingRequiredErrors - .stream() - .collect(Collectors.joining(", "))); - - return ConditionEvaluationResult.disabled(builder.toString()); - } - - return ConditionEvaluationResult.enabled( - "Not annotated with " + SkipWhenErrorCannotBeGenerated.class.getSimpleName()); - } -} diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java deleted file mode 100644 index ac8a629..0000000 --- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java +++ /dev/null @@ -1,440 +0,0 @@ -package de.juplo.kafka; - -import com.mongodb.client.MongoClient; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.serialization.*; -import org.apache.kafka.common.utils.Bytes; -import org.junit.jupiter.api.*; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.autoconfigure.EnableAutoConfiguration; -import org.springframework.boot.autoconfigure.kafka.KafkaProperties; -import org.springframework.boot.autoconfigure.mongo.MongoProperties; -import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo; -import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer; -import org.springframework.boot.test.context.TestConfiguration; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Import; -import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; -import org.springframework.kafka.config.KafkaListenerEndpointRegistry; -import org.springframework.kafka.core.ConsumerFactory; -import org.springframework.kafka.core.DefaultKafkaConsumerFactory; -import org.springframework.kafka.test.context.EmbeddedKafka; -import org.springframework.test.context.TestPropertySource; -import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; - -import java.time.Duration; -import java.util.*; -import java.util.function.BiConsumer; -import java.util.function.Consumer; -import java.util.stream.Collectors; -import java.util.stream.IntStream; - -import static de.juplo.kafka.GenericApplicationTests.PARTITIONS; -import static de.juplo.kafka.GenericApplicationTests.TOPIC; -import static org.assertj.core.api.Assertions.*; -import static org.awaitility.Awaitility.*; - - -@SpringJUnitConfig(initializers = ConfigDataApplicationContextInitializer.class) -@TestPropertySource( - properties = { - "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", - "spring.kafka.producer.bootstrap-servers=${spring.embedded.kafka.brokers}", - "sumup.adder.topic=" + TOPIC, - "spring.kafka.consumer.auto-commit-interval=500ms", - "spring.mongodb.embedded.version=4.4.13" }) -@EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS) -@EnableAutoConfiguration -@AutoConfigureDataMongo -@Slf4j -abstract class GenericApplicationTests -{ - public static final String TOPIC = "FOO"; - public static final int PARTITIONS = 10; - - - @Autowired - org.apache.kafka.clients.consumer.Consumer kafkaConsumer; - @Autowired - KafkaProperties kafkaProperties; - @Autowired - ApplicationProperties applicationProperties; - @Autowired - MongoClient mongoClient; - @Autowired - MongoProperties mongoProperties; - @Autowired - KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry; - @Autowired - TestRecordHandler recordHandler; - @Autowired - DeadLetterTopicConsumer deadLetterTopicConsumer; - @Autowired - EndlessConsumer endlessConsumer; - - KafkaProducer testRecordProducer; - KafkaConsumer offsetConsumer; - Map oldOffsets; - - - final RecordGenerator recordGenerator; - final Consumer> messageSender; - - public GenericApplicationTests(RecordGenerator recordGenerator) - { - this.recordGenerator = recordGenerator; - this.messageSender = (record) -> sendMessage(record); - } - - - /** Tests methods */ - - @Test - void commitsCurrentOffsetsOnSuccess() throws Exception - { - recordGenerator.generate(false, false, messageSender); - - int numberOfGeneratedMessages = recordGenerator.getNumberOfMessages(); - - await(numberOfGeneratedMessages + " records received") - .atMost(Duration.ofSeconds(30)) - .pollInterval(Duration.ofSeconds(1)) - .until(() -> recordHandler.receivedMessages >= numberOfGeneratedMessages); - - await("Offsets committed") - .atMost(Duration.ofSeconds(10)) - .pollInterval(Duration.ofSeconds(1)) - .untilAsserted(() -> - { - checkSeenOffsetsForProgress(); - assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); - }); - - assertThat(endlessConsumer.running()) - .describedAs("Consumer should still be running") - .isTrue(); - - endlessConsumer.stop(); - recordGenerator.assertBusinessLogic(); - } - - @Test - @SkipWhenErrorCannotBeGenerated(poisonPill = true) - void commitsOffsetOfErrorForReprocessingOnDeserializationError() - { - recordGenerator.generate(true, false, messageSender); - - int numberOfValidMessages = - recordGenerator.getNumberOfMessages() - - recordGenerator.getNumberOfPoisonPills(); - - await(numberOfValidMessages + " records received") - .atMost(Duration.ofSeconds(30)) - .pollInterval(Duration.ofSeconds(1)) - .until(() -> recordHandler.receivedMessages >= numberOfValidMessages); - await(recordGenerator.getNumberOfPoisonPills() + " poison-pills received") - .atMost(Duration.ofSeconds(30)) - .pollInterval(Duration.ofSeconds(1)) - .until(() -> deadLetterTopicConsumer.messages.size() == recordGenerator.getNumberOfPoisonPills()); - - await("Offsets committed") - .atMost(Duration.ofSeconds(10)) - .pollInterval(Duration.ofSeconds(1)) - .untilAsserted(() -> - { - checkSeenOffsetsForProgress(); - assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); - }); - - assertThat(endlessConsumer.running()) - .describedAs("Consumer should still be running") - .isTrue(); - - endlessConsumer.stop(); - recordGenerator.assertBusinessLogic(); - } - - @Test - @SkipWhenErrorCannotBeGenerated(logicError = true) - void commitsOffsetsOfUnseenRecordsOnLogicError() - { - recordGenerator.generate(false, true, messageSender); - - int numberOfValidMessages = - recordGenerator.getNumberOfMessages() - - recordGenerator.getNumberOfLogicErrors(); - - await(numberOfValidMessages + " records received") - .atMost(Duration.ofSeconds(30)) - .pollInterval(Duration.ofSeconds(1)) - .until(() -> recordHandler.receivedMessages >= numberOfValidMessages); - await(recordGenerator.getNumberOfLogicErrors() + " logic-errors received") - .atMost(Duration.ofSeconds(30)) - .pollInterval(Duration.ofSeconds(1)) - .until(() -> deadLetterTopicConsumer.messages.size() == recordGenerator.getNumberOfLogicErrors()); - - await("Offsets committed") - .atMost(Duration.ofSeconds(10)) - .pollInterval(Duration.ofSeconds(1)) - .untilAsserted(() -> - { - checkSeenOffsetsForProgress(); - assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); - }); - - assertThat(endlessConsumer.running()) - .describedAs("Consumer should still be running") - .isTrue(); - - endlessConsumer.stop(); - recordGenerator.assertBusinessLogic(); - } - - - /** Helper methods for the verification of expectations */ - - void assertSeenOffsetsEqualCommittedOffsets(Map offsetsToCheck) - { - doForCurrentOffsets((tp, offset) -> - { - Long expected = offsetsToCheck.get(tp) + 1; - log.debug("Checking, if the offset {} for {} is exactly {}", offset, tp, expected); - assertThat(offset) - .describedAs("Committed offset corresponds to the offset of the consumer") - .isEqualTo(expected); - }); - } - - void assertSeenOffsetsAreBehindCommittedOffsets(Map offsetsToCheck) - { - List isOffsetBehindSeen = new LinkedList<>(); - - doForCurrentOffsets((tp, offset) -> - { - Long expected = offsetsToCheck.get(tp) + 1; - log.debug("Checking, if the offset {} for {} is at most {}", offset, tp, expected); - assertThat(offset) - .describedAs("Committed offset must be at most equal to the offset of the consumer") - .isLessThanOrEqualTo(expected); - isOffsetBehindSeen.add(offset < expected); - }); - - assertThat(isOffsetBehindSeen.stream().reduce(false, (result, next) -> result | next)) - .describedAs("Committed offsets are behind seen offsets") - .isTrue(); - } - - void checkSeenOffsetsForProgress() - { - // Be sure, that some messages were consumed...! - Set withProgress = new HashSet<>(); - partitions().forEach(tp -> - { - Long oldOffset = oldOffsets.get(tp) + 1; - Long newOffset = recordHandler.seenOffsets.get(tp) + 1; - if (!oldOffset.equals(newOffset)) - { - log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset); - withProgress.add(tp); - } - }); - assertThat(withProgress) - .describedAs("Some offsets must have changed, compared to the old offset-positions") - .isNotEmpty(); - } - - - /** Helper methods for setting up and running the tests */ - - void seekToEnd() - { - offsetConsumer.assign(partitions()); - offsetConsumer.seekToEnd(partitions()); - partitions().forEach(tp -> - { - // seekToEnd() works lazily: it only takes effect on poll()/position() - Long offset = offsetConsumer.position(tp); - log.info("New position for {}: {}", tp, offset); - }); - // The new positions must be commited! - offsetConsumer.commitSync(); - offsetConsumer.unsubscribe(); - } - - void doForCurrentOffsets(BiConsumer consumer) - { - offsetConsumer.assign(partitions()); - partitions().forEach(tp -> consumer.accept(tp, offsetConsumer.position(tp))); - offsetConsumer.unsubscribe(); - } - - List partitions() - { - return - IntStream - .range(0, PARTITIONS) - .mapToObj(partition -> new TopicPartition(TOPIC, partition)) - .collect(Collectors.toList()); - } - - - public interface RecordGenerator - { - void generate( - boolean poisonPills, - boolean logicErrors, - Consumer> messageSender); - - int getNumberOfMessages(); - int getNumberOfPoisonPills(); - int getNumberOfLogicErrors(); - - default boolean canGeneratePoisonPill() - { - return true; - } - - default boolean canGenerateLogicError() - { - return true; - } - - default void assertBusinessLogic() - { - log.debug("No business-logic to assert"); - } - } - - void sendMessage(ProducerRecord record) - { - testRecordProducer.send(record, (metadata, e) -> - { - if (metadata != null) - { - log.debug( - "{}|{} - {}={}", - metadata.partition(), - metadata.offset(), - record.key(), - record.value()); - } - else - { - log.warn( - "Exception for {}={}: {}", - record.key(), - record.value(), - e.toString()); - } - }); - } - - - @BeforeEach - public void init() - { - Properties props; - props = new Properties(); - props.put("bootstrap.servers", kafkaProperties.getBootstrapServers()); - props.put("linger.ms", 100); - props.put("key.serializer", BytesSerializer.class.getName()); - props.put("value.serializer", BytesSerializer.class.getName()); - testRecordProducer = new KafkaProducer<>(props); - - props = new Properties(); - props.put("bootstrap.servers", kafkaProperties.getBootstrapServers()); - props.put("client.id", "OFFSET-CONSUMER"); - props.put("group.id", kafkaProperties.getConsumer().getGroupId()); - props.put("key.deserializer", BytesDeserializer.class.getName()); - props.put("value.deserializer", BytesDeserializer.class.getName()); - offsetConsumer = new KafkaConsumer<>(props); - - mongoClient.getDatabase(mongoProperties.getDatabase()).drop(); - seekToEnd(); - - oldOffsets = new HashMap<>(); - recordHandler.seenOffsets = new HashMap<>(); - recordHandler.receivedMessages = 0; - - deadLetterTopicConsumer.messages.clear(); - - doForCurrentOffsets((tp, offset) -> - { - oldOffsets.put(tp, offset - 1); - recordHandler.seenOffsets.put(tp, offset - 1); - }); - - endlessConsumer.start(); - } - - @AfterEach - public void deinit() - { - try - { - endlessConsumer.stop(); - } - catch (Exception e) - { - log.debug("{}", e.toString()); - } - - try - { - testRecordProducer.close(); - offsetConsumer.close(); - } - catch (Exception e) - { - log.info("Exception while stopping the consumer: {}", e.toString()); - } - } - - - @TestConfiguration - @Import(ApplicationConfiguration.class) - public static class Configuration - { - @Bean - public RecordHandler recordHandler(RecordHandler applicationRecordHandler) - { - return new TestRecordHandler(applicationRecordHandler); - } - - @Bean(destroyMethod = "close") - public org.apache.kafka.clients.consumer.Consumer kafkaConsumer(ConsumerFactory factory) - { - return factory.createConsumer(); - } - - @Bean - public ConcurrentKafkaListenerContainerFactory dltContainerFactory( - KafkaProperties properties) - { - Map consumerProperties = new HashMap<>(); - - consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers()); - consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - - DefaultKafkaConsumerFactory dltConsumerFactory = - new DefaultKafkaConsumerFactory<>(consumerProperties); - ConcurrentKafkaListenerContainerFactory factory = - new ConcurrentKafkaListenerContainerFactory<>(); - factory.setConsumerFactory(dltConsumerFactory); - return factory; - } - - @Bean - public DeadLetterTopicConsumer deadLetterTopicConsumer() - { - return new DeadLetterTopicConsumer(); - } - } -} diff --git a/src/test/java/de/juplo/kafka/SkipWhenErrorCannotBeGenerated.java b/src/test/java/de/juplo/kafka/SkipWhenErrorCannotBeGenerated.java deleted file mode 100644 index 6d15e9e..0000000 --- a/src/test/java/de/juplo/kafka/SkipWhenErrorCannotBeGenerated.java +++ /dev/null @@ -1,15 +0,0 @@ -package de.juplo.kafka; - -import org.junit.jupiter.api.extension.ExtendWith; - -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; - - -@Retention(RetentionPolicy.RUNTIME) -@ExtendWith(ErrorCannotBeGeneratedCondition.class) -public @interface SkipWhenErrorCannotBeGenerated -{ - boolean poisonPill() default false; - boolean logicError() default false; -} diff --git a/src/test/java/de/juplo/kafka/TestRecordHandler.java b/src/test/java/de/juplo/kafka/TestRecordHandler.java deleted file mode 100644 index d9f4e67..0000000 --- a/src/test/java/de/juplo/kafka/TestRecordHandler.java +++ /dev/null @@ -1,52 +0,0 @@ -package de.juplo.kafka; - -import lombok.RequiredArgsConstructor; -import org.apache.kafka.common.TopicPartition; - -import java.util.List; -import java.util.Map; - - -@RequiredArgsConstructor -public class TestRecordHandler implements RecordHandler -{ - private final RecordHandler handler; - - Map seenOffsets; - int receivedMessages; - - - public void onNewRecord( - String topic, - Integer partition, - Long offset, - Message messgage) - { - seenOffsets.put(new TopicPartition(topic, partition), offset); - receivedMessages++; - } - - @Override - public void addNumber( - String topic, - Integer partition, - Long offset, - String user, - MessageAddNumber message) - { - this.onNewRecord(topic, partition, offset, message); - handler.addNumber(topic, partition, offset, user, message); - } - - @Override - public void calculateSum( - String topic, - Integer partition, - Long offset, - String user, - MessageCalculateSum message) - { - this.onNewRecord(topic, partition, offset, message); - handler.calculateSum(topic, partition, offset, user, message); - } -}