From: Kai Moritz Date: Tue, 1 Nov 2022 18:44:39 +0000 (+0100) Subject: WIP X-Git-Tag: simple-consumer--json-DEPRECATED~28 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=377840107151d9c270f7e3a91a118dce4aa1295f;p=demos%2Fkafka%2Ftraining WIP --- diff --git a/src/main/java/de/juplo/kafka/AdderBusinessLogic.java b/src/main/java/de/juplo/kafka/AdderBusinessLogic.java deleted file mode 100644 index d525182..0000000 --- a/src/main/java/de/juplo/kafka/AdderBusinessLogic.java +++ /dev/null @@ -1,55 +0,0 @@ -package de.juplo.kafka; - - -import java.util.HashMap; -import java.util.Map; -import java.util.Optional; - - -public class AdderBusinessLogic -{ - private final Map state; - - - public AdderBusinessLogic() - { - this(new HashMap<>()); - } - - public AdderBusinessLogic(Map state) - { - this.state = state; - } - - - public synchronized Optional getSum(String user) - { - return Optional.ofNullable(state.get(user)).map(result -> result.sum); - } - - public synchronized void addToSum(String user, Integer value) - { - if (value == null || value < 1) - throw new IllegalArgumentException("Not a positive number: " + value); - - long sum = - Optional - .ofNullable(state.get(user)) - .map(result -> result.sum) - .orElse(0l); - state.put(user, new AdderResult(value, sum + value)); - } - - public synchronized AdderResult calculate(String user) - { - if (!state.containsKey(user)) - throw new IllegalStateException("No sumation for " + user + " in progress"); - - return state.remove(user); - } - - protected Map getState() - { - return state; - } -} diff --git a/src/main/java/de/juplo/kafka/AdderResult.java b/src/main/java/de/juplo/kafka/AdderResult.java deleted file mode 100644 index 44b7da8..0000000 --- a/src/main/java/de/juplo/kafka/AdderResult.java +++ /dev/null @@ -1,21 +0,0 @@ -package de.juplo.kafka; - -import lombok.EqualsAndHashCode; -import lombok.Getter; -import lombok.RequiredArgsConstructor; - - -@RequiredArgsConstructor -@Getter -@EqualsAndHashCode -public class AdderResult -{ - final int number; - final long sum; - - @Override - public String toString() - { - return "sum(" + number + ") = " + sum; - } -} diff --git a/src/main/java/de/juplo/kafka/AdderResults.java b/src/main/java/de/juplo/kafka/AdderResults.java deleted file mode 100644 index e7f5602..0000000 --- a/src/main/java/de/juplo/kafka/AdderResults.java +++ /dev/null @@ -1,47 +0,0 @@ -package de.juplo.kafka; - -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; - - -public class AdderResults -{ - private final Map>> results = new HashMap<>(); - - - public void addResults(Integer partition, String user, AdderResult result) - { - Map> resultsByUser = this.results.get(partition); - - List results = resultsByUser.get(user); - if (results == null) - { - results = new LinkedList<>(); - resultsByUser.put(user, results); - } - - results.add(result); - } - - protected void addPartition(Integer partition, Map> results) - { - this.results.put(partition, results); - } - - protected Map> removePartition(Integer partition) - { - return this.results.remove(partition); - } - - public Map>> getState() - { - return results; - } - - public Map> getState(Integer partition) - { - return results.get(partition); - } -} diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java index 76c2520..b4a960d 100644 --- a/src/main/java/de/juplo/kafka/Application.java +++ b/src/main/java/de/juplo/kafka/Application.java @@ -1,11 +1,14 @@ package de.juplo.kafka; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.Consumer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.Bean; +import org.springframework.kafka.core.ConsumerFactory; import javax.annotation.PreDestroy; import java.util.concurrent.ExecutorService; @@ -17,57 +20,29 @@ import java.util.concurrent.TimeUnit; public class Application implements ApplicationRunner { @Autowired - EndlessConsumer endlessConsumer; + Consumer consumer; @Autowired - ExecutorService executor; + SimpleConsumer simpleConsumer; @Override public void run(ApplicationArguments args) throws Exception { log.info("Starting EndlessConsumer"); - endlessConsumer.start(); + simpleConsumer.start(); } @PreDestroy public void shutdown() { - try - { - log.info("Stopping EndlessConsumer"); - endlessConsumer.stop(); - } - catch (IllegalStateException e) - { - log.info("Was already stopped: {}", e.toString()); - } - catch (Exception e) - { - log.error("Unexpected exception while stopping EndlessConsumer: {}", e); - } + log.info("Signaling the consumer to quit its work"); + consumer.wakeup(); + } - try - { - log.info("Shutting down the ExecutorService."); - executor.shutdown(); - log.info("Waiting 5 seconds for the ExecutorService to terminate..."); - executor.awaitTermination(5, TimeUnit.SECONDS); - } - catch (InterruptedException e) - { - log.error("Exception while waiting for the termination of the ExecutorService: {}", e); - } - finally - { - if (!executor.isTerminated()) - { - log.warn("Forcing shutdown of ExecutorService!"); - executor - .shutdownNow() - .forEach(runnable -> log.warn("Unprocessed task: {}", runnable.getClass().getSimpleName())); - } - log.info("Shutdow of ExecutorService finished"); - } + @Bean(destroyMethod = "close") + public Consumer kafkaConsumer(ConsumerFactory factory) + { + return factory.createConsumer(); } diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 08c827c..23e9bec 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -6,7 +6,6 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import java.util.Optional; import org.springframework.kafka.core.ConsumerFactory; import java.util.concurrent.ExecutorService; @@ -18,54 +17,18 @@ import java.util.concurrent.Executors; public class ApplicationConfiguration { @Bean - public ApplicationRecordHandler applicationRecordHandler( - AdderResults adderResults, - KafkaProperties kafkaProperties, - ApplicationProperties applicationProperties) - { - return new ApplicationRecordHandler( - adderResults, - Optional.ofNullable(applicationProperties.getThrottle()), - kafkaProperties.getClientId()); - } - - @Bean - public AdderResults adderResults() - { - return new AdderResults(); - } - - @Bean - public ApplicationRebalanceListener rebalanceListener( - ApplicationRecordHandler recordHandler, - AdderResults adderResults, - StateRepository stateRepository, - KafkaProperties kafkaProperties) - { - return new ApplicationRebalanceListener( - recordHandler, - adderResults, - stateRepository, - kafkaProperties.getClientId()); - } - - @Bean - public EndlessConsumer endlessConsumer( + public SimpleConsumer endlessConsumer( Consumer kafkaConsumer, ExecutorService executor, - ApplicationRebalanceListener rebalanceListener, - RecordHandler recordHandler, KafkaProperties kafkaProperties, ApplicationProperties applicationProperties) { return - new EndlessConsumer<>( + new SimpleConsumer( executor, kafkaProperties.getClientId(), applicationProperties.getTopic(), - kafkaConsumer, - rebalanceListener, - recordHandler); + kafkaConsumer); } @Bean diff --git a/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java b/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java deleted file mode 100644 index 03a14c8..0000000 --- a/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java +++ /dev/null @@ -1,32 +0,0 @@ -package de.juplo.kafka; - -import lombok.RequiredArgsConstructor; -import org.springframework.boot.actuate.health.Health; -import org.springframework.boot.actuate.health.HealthIndicator; -import org.springframework.stereotype.Component; - - -@Component -@RequiredArgsConstructor -public class ApplicationHealthIndicator implements HealthIndicator -{ - private final EndlessConsumer consumer; - - - @Override - public Health health() - { - try - { - return consumer - .exitStatus() - .map(Health::down) - .orElse(Health.outOfService()) - .build(); - } - catch (IllegalStateException e) - { - return Health.up().build(); - } - } -} diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java index 005460c..d46a8b3 100644 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java @@ -10,7 +10,7 @@ import javax.validation.constraints.NotNull; import java.time.Duration; -@ConfigurationProperties(prefix = "sumup.adder") +@ConfigurationProperties(prefix = "simple.consumer") @Validated @Getter @Setter diff --git a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java deleted file mode 100644 index 0bfee67..0000000 --- a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java +++ /dev/null @@ -1,70 +0,0 @@ -package de.juplo.kafka; - -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; -import org.apache.kafka.common.TopicPartition; - -import java.util.*; - - -@RequiredArgsConstructor -@Slf4j -public class ApplicationRebalanceListener implements ConsumerRebalanceListener -{ - private final ApplicationRecordHandler recordHandler; - private final AdderResults adderResults; - private final StateRepository stateRepository; - private final String id; - - private final Set partitions = new HashSet<>(); - - @Override - public void onPartitionsAssigned(Collection partitions) - { - partitions.forEach(tp -> - { - Integer partition = tp.partition(); - log.info("{} - adding partition: {}", id, partition); - this.partitions.add(partition); - StateDocument document = - stateRepository - .findById(Integer.toString(partition)) - .orElse(new StateDocument(partition)); - recordHandler.addPartition(partition, document.state); - for (String user : document.state.keySet()) - { - log.info( - "{} - Restored state for partition={}|user={}: {}", - id, - partition, - user, - document.state.get(user)); - } - adderResults.addPartition(partition, document.results); - }); - } - - @Override - public void onPartitionsRevoked(Collection partitions) - { - partitions.forEach(tp -> - { - Integer partition = tp.partition(); - log.info("{} - removing partition: {}", id, partition); - this.partitions.remove(partition); - Map state = recordHandler.removePartition(partition); - for (String user : state.keySet()) - { - log.info( - "{} - Saved state for partition={}|user={}: {}", - id, - partition, - user, - state.get(user)); - } - Map> results = adderResults.removePartition(partition); - stateRepository.save(new StateDocument(partition, state, results)); - }); - } -} diff --git a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java deleted file mode 100644 index 2829157..0000000 --- a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java +++ /dev/null @@ -1,93 +0,0 @@ -package de.juplo.kafka; - -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRecord; - -import java.time.Duration; -import java.util.HashMap; -import java.util.Map; -import java.util.Optional; - - -@RequiredArgsConstructor -@Slf4j -public class ApplicationRecordHandler implements RecordHandler -{ - private final AdderResults results; - private final Optional throttle; - private final String id; - - private final Map state = new HashMap<>(); - - - public void addNumber( - Integer partition, - String user, - MessageAddNumber message) - { - state.get(partition).addToSum(user, message.getNext()); - } - - public void calculateSum( - Integer partition, - String user, - MessageCalculateSum message) - { - AdderResult result = state.get(partition).calculate(user); - log.info("{} - New result for {}: {}", id, user, result); - results.addResults(partition, user, result); - } - - @Override - public void accept(ConsumerRecord record) - { - Integer partition = record.partition(); - String user = record.key(); - Message message = record.value(); - - switch(message.getType()) - { - case ADD: - addNumber(partition, user, (MessageAddNumber) message); - break; - - case CALC: - calculateSum(partition, user, (MessageCalculateSum) message); - break; - } - - if (throttle.isPresent()) - { - try - { - Thread.sleep(throttle.get().toMillis()); - } - catch (InterruptedException e) - { - log.warn("{} - Intrerrupted while throttling: {}", id, e); - } - } - } - - protected void addPartition(Integer partition, Map state) - { - this.state.put(partition, new AdderBusinessLogic(state)); - } - - protected Map removePartition(Integer partition) - { - return this.state.remove(partition).getState(); - } - - - public Map getState() - { - return state; - } - - public AdderBusinessLogic getState(Integer partition) - { - return state.get(partition); - } -} diff --git a/src/main/java/de/juplo/kafka/DriverController.java b/src/main/java/de/juplo/kafka/DriverController.java deleted file mode 100644 index 26a5bc8..0000000 --- a/src/main/java/de/juplo/kafka/DriverController.java +++ /dev/null @@ -1,89 +0,0 @@ -package de.juplo.kafka; - -import lombok.RequiredArgsConstructor; -import org.springframework.http.HttpStatus; -import org.springframework.http.ResponseEntity; -import org.springframework.web.bind.annotation.*; - -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.concurrent.ExecutionException; -import java.util.stream.Collectors; - - -@RestController -@RequiredArgsConstructor -public class DriverController -{ - private final EndlessConsumer consumer; - private final ApplicationRecordHandler recordHandler; - private final AdderResults results; - - - @PostMapping("start") - public void start() - { - consumer.start(); - } - - @PostMapping("stop") - public void stop() throws ExecutionException, InterruptedException - { - consumer.stop(); - } - - - @GetMapping("state") - public Map> state() - { - return - recordHandler - .getState() - .entrySet() - .stream() - .collect(Collectors.toMap( - entry -> entry.getKey(), - entry -> entry.getValue().getState())); - } - - @GetMapping("state/{user}") - public ResponseEntity state(@PathVariable String user) - { - for (AdderBusinessLogic adder : recordHandler.getState().values()) - { - Optional sum = adder.getSum(user); - if (sum.isPresent()) - return ResponseEntity.ok(sum.get()); - } - - return ResponseEntity.notFound().build(); - } - - @GetMapping("results") - public Map>> results() - { - return results.getState(); - } - - @GetMapping("results/{user}") - public ResponseEntity> results(@PathVariable String user) - { - for (Map> resultsByUser : this.results.getState().values()) - { - List results = resultsByUser.get(user); - if (results != null) - return ResponseEntity.ok(results); - } - - return ResponseEntity.notFound().build(); - } - - - @ExceptionHandler - @ResponseStatus(HttpStatus.BAD_REQUEST) - public ErrorResponse illegalStateException(IllegalStateException e) - { - return new ErrorResponse(e.getMessage(), HttpStatus.BAD_REQUEST.value()); - } -} diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index 00678c4..ba8eb27 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -6,6 +6,7 @@ import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.RecordDeserializationException; import org.apache.kafka.common.errors.WakeupException; +import org.springframework.stereotype.Component; import javax.annotation.PreDestroy; import java.time.Duration; @@ -17,9 +18,10 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +@Component @Slf4j @RequiredArgsConstructor -public class EndlessConsumer implements Runnable +public class EndlessConsumer implements Runnable { private final ExecutorService executor; private final String id; diff --git a/src/main/java/de/juplo/kafka/ErrorResponse.java b/src/main/java/de/juplo/kafka/ErrorResponse.java deleted file mode 100644 index 5ca206d..0000000 --- a/src/main/java/de/juplo/kafka/ErrorResponse.java +++ /dev/null @@ -1,11 +0,0 @@ -package de.juplo.kafka; - -import lombok.Value; - - -@Value -public class ErrorResponse -{ - private final String error; - private final Integer status; -} diff --git a/src/main/java/de/juplo/kafka/RecordHandler.java b/src/main/java/de/juplo/kafka/RecordHandler.java deleted file mode 100644 index 327ac9f..0000000 --- a/src/main/java/de/juplo/kafka/RecordHandler.java +++ /dev/null @@ -1,10 +0,0 @@ -package de.juplo.kafka; - -import org.apache.kafka.clients.consumer.ConsumerRecord; - -import java.util.function.Consumer; - - -public interface RecordHandler extends Consumer> -{ -} diff --git a/src/main/java/de/juplo/kafka/StateDocument.java b/src/main/java/de/juplo/kafka/StateDocument.java deleted file mode 100644 index ae8eb51..0000000 --- a/src/main/java/de/juplo/kafka/StateDocument.java +++ /dev/null @@ -1,41 +0,0 @@ -package de.juplo.kafka; - -import lombok.ToString; -import org.springframework.data.annotation.Id; -import org.springframework.data.mongodb.core.mapping.Document; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; - - -@Document(collection = "state") -@ToString -public class StateDocument -{ - @Id - public String id; - public Map state; - public Map> results; - - public StateDocument() - { - } - - public StateDocument(Integer partition) - { - this.id = Integer.toString(partition); - this.state = new HashMap<>(); - this.results = new HashMap<>(); - } - - public StateDocument( - Integer partition, - Map state, - Map> results) - { - this.id = Integer.toString(partition); - this.state = state; - this.results = results; - } -} diff --git a/src/main/java/de/juplo/kafka/StateRepository.java b/src/main/java/de/juplo/kafka/StateRepository.java deleted file mode 100644 index 3129535..0000000 --- a/src/main/java/de/juplo/kafka/StateRepository.java +++ /dev/null @@ -1,11 +0,0 @@ -package de.juplo.kafka; - -import org.springframework.data.mongodb.repository.MongoRepository; - -import java.util.Optional; - - -public interface StateRepository extends MongoRepository -{ - public Optional findById(String partition); -} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 92f3a6b..c2cb792 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -1,6 +1,6 @@ -sumup: - adder: - topic: out +simple: + consumer: + topic: test management: endpoint: shutdown: @@ -22,10 +22,6 @@ info: topic: ${consumer.topic} auto-offset-reset: ${spring.kafka.consumer.auto-offset-reset} spring: - data: - mongodb: - uri: mongodb://juplo:training@localhost:27017 - database: juplo kafka: bootstrap-servers: :9092 client-id: DEV diff --git a/src/test/java/de/juplo/kafka/AdderBusinessLogicTest.java b/src/test/java/de/juplo/kafka/AdderBusinessLogicTest.java deleted file mode 100644 index 8e49263..0000000 --- a/src/test/java/de/juplo/kafka/AdderBusinessLogicTest.java +++ /dev/null @@ -1,117 +0,0 @@ -package de.juplo.kafka; - -import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.Arguments; -import org.junit.jupiter.params.provider.MethodSource; -import org.junit.jupiter.params.provider.ValueSource; - -import java.util.Arrays; -import java.util.stream.IntStream; -import java.util.stream.Stream; - -import static org.assertj.core.api.Assertions.*; - - -public class AdderBusinessLogicTest -{ - @Test - @DisplayName("An empty Optional should be returned, for a non-existing sum") - public void testGetSumReturnsEmptyOptionalForNonExistingSum() - { - AdderBusinessLogic adder = new AdderBusinessLogic(); - assertThat(adder.getSum("foo")).isEmpty(); - } - - @Test - @DisplayName("A non-empty Optional should be returned, for an existing sum") - public void testGetSumReturnsNonEmptyOptionalForExistingSum() - { - AdderBusinessLogic adder = new AdderBusinessLogic(); - adder.addToSum("foo", 6); - assertThat(adder.getSum("foo")).isNotEmpty(); - } - - @Test - @DisplayName("A sum can be calculated, if it does exist") - public void testCalculatePossibleIfSumExists() - { - AdderBusinessLogic adder = new AdderBusinessLogic(); - adder.addToSum("foo", 6); - assertThatNoException().isThrownBy(() -> adder.calculate("foo")); - } - - @Test - @DisplayName("An existing sum is removed, if ended") - public void testCalculateRemovesSumIfSumExists() - { - AdderBusinessLogic adder = new AdderBusinessLogic(); - adder.addToSum("foo", 6); - adder.calculate("foo"); - assertThat(adder.getSum("foo")).isEmpty(); - } - - @Test - @DisplayName("An existing sum returns a non-null value, if calculated") - public void testCalculateReturnsNonNullValueIfSumExists() - { - AdderBusinessLogic adder = new AdderBusinessLogic(); - adder.addToSum("foo", 6); - assertThat(adder.calculate("foo")).isNotNull(); - } - - @Test - @DisplayName("Ending a non-existing sum, causes an IllegalStateException") - public void testCalculateCausesExceptionIfNotExists() - { - AdderBusinessLogic adder = new AdderBusinessLogic(); - assertThatIllegalStateException().isThrownBy(() -> adder.calculate("foo")); - } - - @Test - @DisplayName("Adding a null-value to a sum causes an IllegalArgumentException") - public void testAddToSumWithNullValueCausesException() - { - AdderBusinessLogic adder = new AdderBusinessLogic(); - assertThatIllegalArgumentException().isThrownBy(() -> adder.addToSum("foo", null)); - } - - @ParameterizedTest(name = "{index}: Adding {0}") - @DisplayName("Adding a non-positive value to a sum causes an IllegalArgumentException") - @ValueSource(ints = { 0, -1, -6, -66, Integer.MIN_VALUE }) - public void testAddToSumWithNonPositiveValueCausesException(int value) - { - AdderBusinessLogic adder = new AdderBusinessLogic(); - assertThatIllegalArgumentException().isThrownBy(() -> adder.addToSum("foo", value)); - } - - @ParameterizedTest(name = "{index}: Adding {0}") - @DisplayName("Can add a positive value to a sum") - @ValueSource(ints = { 1, 3, 6, 66, 7, 9 }) - public void testAddToSumWithPositiveValuePossible(int value) - { - AdderBusinessLogic adder = new AdderBusinessLogic(); - assertThatNoException().isThrownBy(() -> adder.addToSum("foo", value)); - } - - @ParameterizedTest(name = "{index}: Summing up {0}") - @DisplayName("Adds up numbers correctly") - @MethodSource("numbersProvider") - public void testAddToSumAddsUpNumbersCorrectlyIfSumExists(int... numbers) - { - long expectedResult = Arrays.stream(numbers).sum(); - AdderBusinessLogic adder = new AdderBusinessLogic(); - Arrays.stream(numbers).forEach(number -> adder.addToSum("foo", number)); - AdderResult result = adder.calculate("foo"); - assertThat(result.number).isEqualTo(numbers[numbers.length-1]); - assertThat(result.sum).isEqualTo(expectedResult); - } - - static Stream numbersProvider() { - return Stream.of( - Arguments.of((Object) IntStream.rangeClosed(1,9).toArray()), - Arguments.of((Object) IntStream.rangeClosed(1,19).toArray()), - Arguments.of((Object) IntStream.rangeClosed(1,66).toArray())); - } -} diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java deleted file mode 100644 index bd9f449..0000000 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ /dev/null @@ -1,172 +0,0 @@ -package de.juplo.kafka; - -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.common.utils.Bytes; -import org.springframework.beans.factory.annotation.Autowired; - -import java.util.*; -import java.util.function.Consumer; -import java.util.stream.Collectors; -import java.util.stream.IntStream; - -import static org.assertj.core.api.Assertions.assertThat; - - -@Slf4j -public class ApplicationTests extends GenericApplicationTests -{ - @Autowired - StateRepository stateRepository; - - - public ApplicationTests() - { - super(new ApplicationTestRecrodGenerator()); - ((ApplicationTestRecrodGenerator)recordGenerator).tests = this; - } - - - static class ApplicationTestRecrodGenerator implements RecordGenerator - { - ApplicationTests tests; - - final int[] numbers = {1, 77, 33, 2, 66, 666, 11}; - final String[] dieWilden13 = - IntStream - .range(1, 14) - .mapToObj(i -> "seeräuber-" + i) - .toArray(i -> new String[i]); - final StringSerializer stringSerializer = new StringSerializer(); - final Bytes calculateMessage = new Bytes(stringSerializer.serialize(TOPIC, "{}")); - - int counter = 0; - - Map> state; - - @Override - public int generate( - boolean poisonPills, - boolean logicErrors, - Consumer> messageSender) - { - counter = 0; - state = - Arrays - .stream(dieWilden13) - .collect(Collectors.toMap( - seeräuber -> seeräuber, - seeräuber -> new LinkedList())); - - int number[] = { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 }; - int message[] = { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 }; - int next = 0; - - for (int pass = 0; pass < 333; pass++) - { - for (int i = 0; i<13; i++) - { - String seeräuber = dieWilden13[i]; - Bytes key = new Bytes(stringSerializer.serialize(TOPIC, seeräuber)); - - if (message[i] > number[i]) - { - send( - key, - calculateMessage, - Message.Type.CALC, - poisonPill(poisonPills, pass, counter), - logicError(logicErrors, pass, counter), - messageSender); - state.get(seeräuber).add(new AdderResult(number[i], (number[i] + 1) * number[i] / 2)); - // Pick next number to calculate - number[i] = numbers[next++%numbers.length]; - message[i] = 1; - log.debug("Seeräuber {} will die Summe für {} berechnen", seeräuber, number[i]); - } - - send( - key, - new Bytes(stringSerializer.serialize(TOPIC, "{\"next\":" + message[i]++ + "}")), - Message.Type.ADD, - poisonPill(poisonPills, pass, counter), - logicError(logicErrors, pass, counter), - messageSender); - } - } - - return counter; - } - - boolean poisonPill (boolean poisonPills, int pass, int counter) - { - return poisonPills && pass > 300 && counter%99 == 0; - } - - boolean logicError(boolean logicErrors, int pass, int counter) - { - return logicErrors && pass > 300 && counter%77 == 0; - } - - void send( - Bytes key, - Bytes value, - Message.Type type, - boolean poisonPill, - boolean logicError, - Consumer> messageSender) - { - counter++; - - if (logicError) - { - value = new Bytes(stringSerializer.serialize(TOPIC, "{\"next\":-1}")); - } - if (poisonPill) - { - value = new Bytes("BOOM!".getBytes()); - } - - ProducerRecord record = new ProducerRecord<>(TOPIC, key, value); - record.headers().add("__TypeId__", type.toString().getBytes()); - messageSender.accept(record); - } - - @Override - public void assertBusinessLogic() - { - for (int i=0; i - { - String user = entry.getKey(); - List resultsForUser = entry.getValue(); - - for (int j=0; j < resultsForUser.size(); j++) - { - if (!(j < state.get(user).size())) - { - break; - } - - assertThat(resultsForUser.get(j)) - .as("Unexpected results calculation %d of user %s", j, user) - .isEqualTo(state.get(user).get(j)); - } - - assertThat(state.get(user)) - .as("More results calculated for user %s as expected", user) - .containsAll(resultsForUser); - }); - } - } - } -} diff --git a/src/test/java/de/juplo/kafka/ErrorCannotBeGeneratedCondition.java b/src/test/java/de/juplo/kafka/ErrorCannotBeGeneratedCondition.java deleted file mode 100644 index 606218f..0000000 --- a/src/test/java/de/juplo/kafka/ErrorCannotBeGeneratedCondition.java +++ /dev/null @@ -1,60 +0,0 @@ -package de.juplo.kafka; - -import org.junit.jupiter.api.extension.ConditionEvaluationResult; -import org.junit.jupiter.api.extension.ExecutionCondition; -import org.junit.jupiter.api.extension.ExtensionContext; -import org.junit.platform.commons.util.AnnotationUtils; - -import java.util.LinkedList; -import java.util.List; -import java.util.Optional; -import java.util.stream.Collectors; - - -public class ErrorCannotBeGeneratedCondition implements ExecutionCondition -{ - @Override - public ConditionEvaluationResult evaluateExecutionCondition(ExtensionContext context) - { - final Optional optional = - AnnotationUtils.findAnnotation( - context.getElement(), - SkipWhenErrorCannotBeGenerated.class); - - if (context.getTestInstance().isEmpty()) - return ConditionEvaluationResult.enabled("Test-instance ist not available"); - - if (optional.isPresent()) - { - SkipWhenErrorCannotBeGenerated skipWhenErrorCannotBeGenerated = optional.get(); - GenericApplicationTests instance = (GenericApplicationTests)context.getTestInstance().get(); - List missingRequiredErrors = new LinkedList<>(); - - if (skipWhenErrorCannotBeGenerated.poisonPill() && !instance.recordGenerator.canGeneratePoisonPill()) - missingRequiredErrors.add("Poison-Pill"); - - if (skipWhenErrorCannotBeGenerated.logicError() && !instance.recordGenerator.canGenerateLogicError()) - missingRequiredErrors.add("Logic-Error"); - - StringBuilder builder = new StringBuilder(); - builder.append(context.getTestClass().get().getSimpleName()); - - if (missingRequiredErrors.isEmpty()) - { - builder.append(" can generate all required types of errors"); - return ConditionEvaluationResult.enabled(builder.toString()); - } - - builder.append(" cannot generate the required error(s): "); - builder.append( - missingRequiredErrors - .stream() - .collect(Collectors.joining(", "))); - - return ConditionEvaluationResult.disabled(builder.toString()); - } - - return ConditionEvaluationResult.enabled( - "Not annotated with " + SkipWhenErrorCannotBeGenerated.class.getSimpleName()); - } -} diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java deleted file mode 100644 index 937b40f..0000000 --- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java +++ /dev/null @@ -1,396 +0,0 @@ -package de.juplo.kafka; - -import com.mongodb.client.MongoClient; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.RecordDeserializationException; -import org.apache.kafka.common.serialization.*; -import org.apache.kafka.common.utils.Bytes; -import org.junit.jupiter.api.*; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.autoconfigure.EnableAutoConfiguration; -import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration; -import org.springframework.boot.autoconfigure.kafka.KafkaProperties; -import org.springframework.boot.autoconfigure.mongo.MongoProperties; -import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo; -import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer; -import org.springframework.boot.test.context.TestConfiguration; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Import; -import org.springframework.kafka.test.context.EmbeddedKafka; -import org.springframework.test.context.TestPropertySource; -import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; - -import java.time.Duration; -import java.util.*; -import java.util.function.BiConsumer; -import java.util.function.Consumer; -import java.util.stream.Collectors; -import java.util.stream.IntStream; - -import static de.juplo.kafka.GenericApplicationTests.PARTITIONS; -import static de.juplo.kafka.GenericApplicationTests.TOPIC; -import static org.assertj.core.api.Assertions.*; -import static org.awaitility.Awaitility.*; - - -@SpringJUnitConfig( - initializers = ConfigDataApplicationContextInitializer.class, - classes = { - KafkaAutoConfiguration.class, - ApplicationTests.Configuration.class }) -@TestPropertySource( - properties = { - "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", - "sumup.adder.topic=" + TOPIC, - "spring.kafka.consumer.auto-commit-interval=500ms", - "spring.mongodb.embedded.version=4.4.13" }) -@EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS) -@EnableAutoConfiguration -@AutoConfigureDataMongo -@Slf4j -abstract class GenericApplicationTests -{ - public static final String TOPIC = "FOO"; - public static final int PARTITIONS = 10; - - - @Autowired - org.apache.kafka.clients.consumer.Consumer kafkaConsumer; - @Autowired - KafkaProperties kafkaProperties; - @Autowired - ApplicationProperties applicationProperties; - @Autowired - MongoClient mongoClient; - @Autowired - MongoProperties mongoProperties; - @Autowired - TestRecordHandler recordHandler; - @Autowired - EndlessConsumer endlessConsumer; - - KafkaProducer testRecordProducer; - KafkaConsumer offsetConsumer; - Map oldOffsets; - - - final RecordGenerator recordGenerator; - final Consumer> messageSender; - - public GenericApplicationTests(RecordGenerator recordGenerator) - { - this.recordGenerator = recordGenerator; - this.messageSender = (record) -> sendMessage(record); - } - - - /** Tests methods */ - - @Test - void commitsCurrentOffsetsOnSuccess() throws Exception - { - int numberOfGeneratedMessages = - recordGenerator.generate(false, false, messageSender); - - await(numberOfGeneratedMessages + " records received") - .atMost(Duration.ofSeconds(30)) - .pollInterval(Duration.ofSeconds(1)) - .until(() -> recordHandler.receivedRecords.size() >= numberOfGeneratedMessages); - - await("Offsets committed") - .atMost(Duration.ofSeconds(10)) - .pollInterval(Duration.ofSeconds(1)) - .untilAsserted(() -> - { - checkSeenOffsetsForProgress(); - assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); - }); - - assertThatExceptionOfType(IllegalStateException.class) - .isThrownBy(() -> endlessConsumer.exitStatus()) - .describedAs("Consumer should still be running"); - - endlessConsumer.stop(); - recordGenerator.assertBusinessLogic(); - } - - @Test - @SkipWhenErrorCannotBeGenerated(poisonPill = true) - void commitsOffsetOfErrorForReprocessingOnDeserializationError() - { - int numberOfGeneratedMessages = - recordGenerator.generate(true, false, messageSender); - - await("Consumer failed") - .atMost(Duration.ofSeconds(30)) - .pollInterval(Duration.ofSeconds(1)) - .until(() -> !endlessConsumer.running()); - - checkSeenOffsetsForProgress(); - assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); - - endlessConsumer.start(); - await("Consumer failed") - .atMost(Duration.ofSeconds(30)) - .pollInterval(Duration.ofSeconds(1)) - .until(() -> !endlessConsumer.running()); - - checkSeenOffsetsForProgress(); - assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); - assertThat(recordHandler.receivedRecords.size()) - .describedAs("Received not all sent events") - .isLessThan(numberOfGeneratedMessages); - - assertThatNoException() - .describedAs("Consumer should not be running") - .isThrownBy(() -> endlessConsumer.exitStatus()); - assertThat(endlessConsumer.exitStatus()) - .describedAs("Consumer should have exited abnormally") - .containsInstanceOf(RecordDeserializationException.class); - - recordGenerator.assertBusinessLogic(); - } - - @Test - @SkipWhenErrorCannotBeGenerated(logicError = true) - void doesNotCommitOffsetsOnLogicError() - { - int numberOfGeneratedMessages = - recordGenerator.generate(false, true, messageSender); - - await("Consumer failed") - .atMost(Duration.ofSeconds(30)) - .pollInterval(Duration.ofSeconds(1)) - .until(() -> !endlessConsumer.running()); - - checkSeenOffsetsForProgress(); - assertSeenOffsetsAreBehindCommittedOffsets(recordHandler.seenOffsets); - - endlessConsumer.start(); - await("Consumer failed") - .atMost(Duration.ofSeconds(30)) - .pollInterval(Duration.ofSeconds(1)) - .until(() -> !endlessConsumer.running()); - - assertSeenOffsetsAreBehindCommittedOffsets(recordHandler.seenOffsets); - - assertThatNoException() - .describedAs("Consumer should not be running") - .isThrownBy(() -> endlessConsumer.exitStatus()); - assertThat(endlessConsumer.exitStatus()) - .describedAs("Consumer should have exited abnormally") - .containsInstanceOf(RuntimeException.class); - - recordGenerator.assertBusinessLogic(); - } - - - /** Helper methods for the verification of expectations */ - - void assertSeenOffsetsEqualCommittedOffsets(Map offsetsToCheck) - { - doForCurrentOffsets((tp, offset) -> - { - Long expected = offsetsToCheck.get(tp) + 1; - log.debug("Checking, if the offset {} for {} is exactly {}", offset, tp, expected); - assertThat(offset) - .describedAs("Committed offset corresponds to the offset of the consumer") - .isEqualTo(expected); - }); - } - - void assertSeenOffsetsAreBehindCommittedOffsets(Map offsetsToCheck) - { - List isOffsetBehindSeen = new LinkedList<>(); - - doForCurrentOffsets((tp, offset) -> - { - Long expected = offsetsToCheck.get(tp) + 1; - log.debug("Checking, if the offset {} for {} is at most {}", offset, tp, expected); - assertThat(offset) - .describedAs("Committed offset must be at most equal to the offset of the consumer") - .isLessThanOrEqualTo(expected); - isOffsetBehindSeen.add(offset < expected); - }); - - assertThat(isOffsetBehindSeen.stream().reduce(false, (result, next) -> result | next)) - .describedAs("Committed offsets are behind seen offsets") - .isTrue(); - } - - void checkSeenOffsetsForProgress() - { - // Be sure, that some messages were consumed...! - Set withProgress = new HashSet<>(); - partitions().forEach(tp -> - { - Long oldOffset = oldOffsets.get(tp) + 1; - Long newOffset = recordHandler.seenOffsets.get(tp) + 1; - if (!oldOffset.equals(newOffset)) - { - log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset); - withProgress.add(tp); - } - }); - assertThat(withProgress) - .describedAs("Some offsets must have changed, compared to the old offset-positions") - .isNotEmpty(); - } - - - /** Helper methods for setting up and running the tests */ - - void seekToEnd() - { - offsetConsumer.assign(partitions()); - offsetConsumer.seekToEnd(partitions()); - partitions().forEach(tp -> - { - // seekToEnd() works lazily: it only takes effect on poll()/position() - Long offset = offsetConsumer.position(tp); - log.info("New position for {}: {}", tp, offset); - }); - // The new positions must be commited! - offsetConsumer.commitSync(); - offsetConsumer.unsubscribe(); - } - - void doForCurrentOffsets(BiConsumer consumer) - { - offsetConsumer.assign(partitions()); - partitions().forEach(tp -> consumer.accept(tp, offsetConsumer.position(tp))); - offsetConsumer.unsubscribe(); - } - - List partitions() - { - return - IntStream - .range(0, PARTITIONS) - .mapToObj(partition -> new TopicPartition(TOPIC, partition)) - .collect(Collectors.toList()); - } - - - public interface RecordGenerator - { - int generate( - boolean poisonPills, - boolean logicErrors, - Consumer> messageSender); - - default boolean canGeneratePoisonPill() - { - return true; - } - - default boolean canGenerateLogicError() - { - return true; - } - - default void assertBusinessLogic() - { - log.debug("No business-logic to assert"); - } - } - - void sendMessage(ProducerRecord record) - { - testRecordProducer.send(record, (metadata, e) -> - { - if (metadata != null) - { - log.debug( - "{}|{} - {}={}", - metadata.partition(), - metadata.offset(), - record.key(), - record.value()); - } - else - { - log.warn( - "Exception for {}={}: {}", - record.key(), - record.value(), - e.toString()); - } - }); - } - - - @BeforeEach - public void init() - { - Properties props; - props = new Properties(); - props.put("bootstrap.servers", kafkaProperties.getBootstrapServers()); - props.put("linger.ms", 100); - props.put("key.serializer", BytesSerializer.class.getName()); - props.put("value.serializer", BytesSerializer.class.getName()); - testRecordProducer = new KafkaProducer<>(props); - - props = new Properties(); - props.put("bootstrap.servers", kafkaProperties.getBootstrapServers()); - props.put("client.id", "OFFSET-CONSUMER"); - props.put("group.id", kafkaProperties.getConsumer().getGroupId()); - props.put("key.deserializer", BytesDeserializer.class.getName()); - props.put("value.deserializer", BytesDeserializer.class.getName()); - offsetConsumer = new KafkaConsumer<>(props); - - mongoClient.getDatabase(mongoProperties.getDatabase()).drop(); - seekToEnd(); - - oldOffsets = new HashMap<>(); - recordHandler.seenOffsets = new HashMap<>(); - recordHandler.receivedRecords = new HashSet<>(); - - doForCurrentOffsets((tp, offset) -> - { - oldOffsets.put(tp, offset - 1); - recordHandler.seenOffsets.put(tp, offset - 1); - }); - - endlessConsumer.start(); - } - - @AfterEach - public void deinit() - { - try - { - endlessConsumer.stop(); - } - catch (Exception e) - { - log.debug("{}", e.toString()); - } - - try - { - testRecordProducer.close(); - offsetConsumer.close(); - } - catch (Exception e) - { - log.info("Exception while stopping the consumer: {}", e.toString()); - } - } - - - @TestConfiguration - @Import(ApplicationConfiguration.class) - public static class Configuration - { - @Bean - public RecordHandler recordHandler(RecordHandler applicationRecordHandler) - { - return new TestRecordHandler(applicationRecordHandler); - } - } -} diff --git a/src/test/java/de/juplo/kafka/SkipWhenErrorCannotBeGenerated.java b/src/test/java/de/juplo/kafka/SkipWhenErrorCannotBeGenerated.java deleted file mode 100644 index 6d15e9e..0000000 --- a/src/test/java/de/juplo/kafka/SkipWhenErrorCannotBeGenerated.java +++ /dev/null @@ -1,15 +0,0 @@ -package de.juplo.kafka; - -import org.junit.jupiter.api.extension.ExtendWith; - -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; - - -@Retention(RetentionPolicy.RUNTIME) -@ExtendWith(ErrorCannotBeGeneratedCondition.class) -public @interface SkipWhenErrorCannotBeGenerated -{ - boolean poisonPill() default false; - boolean logicError() default false; -} diff --git a/src/test/java/de/juplo/kafka/TestRecordHandler.java b/src/test/java/de/juplo/kafka/TestRecordHandler.java deleted file mode 100644 index 37d3f65..0000000 --- a/src/test/java/de/juplo/kafka/TestRecordHandler.java +++ /dev/null @@ -1,34 +0,0 @@ -package de.juplo.kafka; - -import lombok.RequiredArgsConstructor; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.common.TopicPartition; - -import java.util.Map; -import java.util.Set; - - -@RequiredArgsConstructor -public class TestRecordHandler implements RecordHandler -{ - private final RecordHandler handler; - - Map seenOffsets; - Set> receivedRecords; - - - public void onNewRecord(ConsumerRecord record) - { - seenOffsets.put( - new TopicPartition(record.topic(), record.partition()), - record.offset()); - receivedRecords.add(record); - } - - @Override - public void accept(ConsumerRecord record) - { - this.onNewRecord(record); - handler.accept(record); - } -}