From 1bf30f5890d9ab0a1c7550fe472dec44f486a473 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 18 Sep 2022 17:52:12 +0200 Subject: [PATCH] Einfacher geht es nicht mehr * Die `id` kann nur weggelassen werden, wenn `assign()` verwendet wird. --- README.sh | 89 ++----------- docker-compose.yml | 80 +----------- pom.xml | 19 +-- .../de/juplo/kafka/AdderBusinessLogic.java | 55 -------- src/main/java/de/juplo/kafka/AdderResult.java | 21 ---- .../java/de/juplo/kafka/AdderResults.java | 47 ------- src/main/java/de/juplo/kafka/Application.java | 103 +-------------- .../de/juplo/kafka/ApplicationController.java | 39 ------ .../kafka/ApplicationHealthIndicator.java | 23 ---- .../de/juplo/kafka/ApplicationProperties.java | 23 ---- .../kafka/ApplicationRebalanceListener.java | 71 ----------- .../juplo/kafka/ApplicationRecordHandler.java | 84 ------------- src/main/java/de/juplo/kafka/Message.java | 9 -- .../java/de/juplo/kafka/MessageAddNumber.java | 19 --- .../de/juplo/kafka/MessageCalculateSum.java | 16 --- .../java/de/juplo/kafka/StateDocument.java | 41 ------ .../java/de/juplo/kafka/StateRepository.java | 11 -- src/main/resources/application.yml | 20 --- .../juplo/kafka/AdderBusinessLogicTest.java | 117 ------------------ ...plicationIT.java => ApplicationTests.java} | 15 +-- src/test/java/de/juplo/kafka/MessageTest.java | 39 ------ 21 files changed, 26 insertions(+), 915 deletions(-) delete mode 100644 src/main/java/de/juplo/kafka/AdderBusinessLogic.java delete mode 100644 src/main/java/de/juplo/kafka/AdderResult.java delete mode 100644 src/main/java/de/juplo/kafka/AdderResults.java delete mode 100644 src/main/java/de/juplo/kafka/ApplicationController.java delete mode 100644 src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java delete mode 100644 src/main/java/de/juplo/kafka/ApplicationProperties.java delete mode 100644 src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java delete mode 100644 src/main/java/de/juplo/kafka/ApplicationRecordHandler.java delete mode 100644 src/main/java/de/juplo/kafka/Message.java delete mode 100644 src/main/java/de/juplo/kafka/MessageAddNumber.java delete mode 100644 src/main/java/de/juplo/kafka/MessageCalculateSum.java delete mode 100644 src/main/java/de/juplo/kafka/StateDocument.java delete mode 100644 src/main/java/de/juplo/kafka/StateRepository.java delete mode 100644 src/test/java/de/juplo/kafka/AdderBusinessLogicTest.java rename src/test/java/de/juplo/kafka/{ApplicationIT.java => ApplicationTests.java} (62%) delete mode 100644 src/test/java/de/juplo/kafka/MessageTest.java diff --git a/README.sh b/README.sh index a2d813d..69d05ee 100755 --- a/README.sh +++ b/README.sh @@ -1,6 +1,6 @@ #!/bin/bash -IMAGE=juplo/sumup-adder-springified:1.0-SNAPSHOT +IMAGE=juplo/supersimple:1.0-SNAPSHOT if [ "$1" = "cleanup" ] then @@ -9,17 +9,16 @@ then exit fi -docker-compose rm -svf adder-1 adder-2 -docker-compose rm -svf mongo -docker-compose up -d zookeeper kafka-1 kafka-2 kafka-3 cli mongo express +docker-compose rm -svf supersimple +docker-compose up -d zookeeper kafka-1 kafka-2 kafka-3 cli if [[ $(docker image ls -q $IMAGE) == "" || "$1" = "build" ]] then - docker-compose rm -svf adder-1 adder-2 - mvn -D skipTests clean install || exit + docker-compose rm -svf supersimple + mvn clean install || exit else echo "Using image existing images:" docker image ls $IMAGE @@ -28,84 +27,14 @@ fi echo "Waiting for the Kafka-Cluster to become ready..." docker-compose exec cli cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1 docker-compose up setup -docker-compose up -d gateway requests-1 requests-2 +docker-compose up -d gateway requests-1 requests-2 supersimple while ! [[ $(http 0:8080/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for gateway..."; sleep 1; done while ! [[ $(http 0:8081/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for requests-1..."; sleep 1; done while ! [[ $(http 0:8082/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for requests-2..."; sleep 1; done -docker-compose up -d peter klaus +echo 6 | http -v :8080/peter -docker-compose up -d adder-1 -while ! [[ $(http 0:8091/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for adder-1..."; sleep 1; done -while [[ "$(http :8091/results | jq -r .)" == "{}" ]]; do echo "Waiting for some results to show up on adder-1..."; sleep 1; done -http -v --pretty none -S :8091/results -echo +sleep 10 -sleep 3 -echo "Resultate für adder-1" -http -v --pretty none -S :8091/results -echo - -echo "Resultate für peter von adder-1" -http :8091/results/peter | jq .[].sum | uniq -echo "Resultate für klaus von adder-1" -http :8091/results/klaus | jq .[].sum | uniq - - -docker-compose up -d adder-2 -while ! [[ $(http 0:8092/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for adder-2..."; sleep 1; done -while [[ "$(http :8092/results | jq -r .)" == "{}" ]]; do echo "Waiting for some results to show up on adder-2..."; sleep 1; done -http -v --pretty none -S :8092/results -echo - -sleep 3 -echo "Resultate für adder-2" -http -v --pretty none -S :8092/results -echo - -echo "Resultate für peter von adder-1" -http :8091/results/peter | jq .[].sum | uniq -echo "Resultate für klaus von adder-1" -http :8091/results/klaus | jq .[].sum | uniq - -echo "Resultate für peter von adder-2" -http :8092/results/peter | jq .[].sum | uniq -echo "Resultate für klaus von adder-2" -http :8092/results/klaus | jq .[].sum | uniq - -docker-compose stop adder-1 -until [ $(http --check-status :8092/results/peter 2> /dev/null) ]; do echo "Waiting for some results for peter to show up on adder-2..."; sleep 1; done -until [ $(http --check-status :8092/results/klaus 2> /dev/null) ]; do echo "Waiting for some results for klaus to show up on adder-2..."; sleep 1; done - -echo "Resultate für adder-2" -http -v --pretty none -S :8092/results -echo - -echo "Resultate für peter von adder-2" -http :8092/results/peter | jq .[].sum | uniq -echo "Resultate für klaus von adder-2" -http :8092/results/klaus | jq .[].sum | uniq - -docker-compose kill -s 9 adder-2 -docker-compose start adder-1 -docker-compose kill -s 9 peter klaus -while ! [[ $(http 0:8091/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for adder-1..."; sleep 1; done -until [ $(http --check-status :8091/results/peter 2> /dev/null) ]; do echo "Waiting for some results for peter to show up on adder-1..."; sleep 1; done -until [ $(http --check-status :8091/results/klaus 2> /dev/null) ]; do echo "Waiting for some results for klaus to show up on adder-1..."; sleep 1; done - -echo "Resultate für adder-1" -http -v --pretty none -S :8091/results -echo - -echo "Resultate für peter von adder-1" -http :8091/results/peter | jq .[].sum | uniq -echo "Resultate für klaus von adder-1" -http :8091/results/klaus | jq .[].sum | uniq - -sleep 5 - -echo "Resultate für peter von adder-1" -http :8091/results/peter | jq .[].sum | uniq -echo "Resultate für klaus von adder-1" -http :8091/results/klaus | jq .[].sum | uniq +docker-compose logs supersimple diff --git a/docker-compose.yml b/docker-compose.yml index a3da553..69e67a2 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -60,38 +60,15 @@ services: depends_on: - zookeeper - mongo: - image: mongo:4.4.13 - ports: - - 27017:27017 - environment: - MONGO_INITDB_ROOT_USERNAME: juplo - MONGO_INITDB_ROOT_PASSWORD: training - - express: - image: mongo-express - ports: - - 8090:8081 - environment: - ME_CONFIG_MONGODB_ADMINUSERNAME: juplo - ME_CONFIG_MONGODB_ADMINPASSWORD: training - ME_CONFIG_MONGODB_URL: mongodb://juplo:training@mongo:27017/ - depends_on: - - mongo setup: image: juplo/toolbox command: > bash -c " - kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic in - kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic out - kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic out.DLT - kafka-topics --bootstrap-server kafka:9092 --create --topic in --partitions 2 --replication-factor 3 --config min.insync.replicas=2 - kafka-topics --bootstrap-server kafka:9092 --create --topic out --partitions 2 --replication-factor 3 --config min.insync.replicas=2 - kafka-topics --bootstrap-server kafka:9092 --create --topic out.DLT --partitions 2 --replication-factor 3 --config min.insync.replicas=2 + kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic in --partitions 2 --replication-factor 3 --config min.insync.replicas=2 + kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic out --partitions 2 --replication-factor 3 --config min.insync.replicas=2 kafka-topics --bootstrap-server kafka:9092 --describe --topic in kafka-topics --bootstrap-server kafka:9092 --describe --topic out - kafka-topics --bootstrap-server kafka:9092 --describe --topic out.DLT " cli: @@ -126,58 +103,13 @@ services: sumup.requests.bootstrap-server: kafka:9092 sumup.requests.client-id: requests-2 - adder-1: - image: juplo/sumup-adder-springified:1.0-SNAPSHOT - ports: - - 8091:8080 - environment: - server.port: 8080 - spring.kafka.bootstrap-servers: kafka:9092 - spring.kafka.producer.bootstrap-servers: kafka:9092 - spring.kafak.client-id: adder-1 - spring.kafka.auto-commit-interval: 1s - sumup.adder.throttle: 3ms - spring.data.mongodb.uri: mongodb://juplo:training@mongo:27017 - spring.data.mongodb.database: juplo - logging.level.org.apache.kafka.clients.consumer: INFO - - adder-2: - image: juplo/sumup-adder-springified:1.0-SNAPSHOT + supersimple: + image: juplo/supersimple:1.0-SNAPSHOT ports: - - 8092:8080 + - 8090:8080 environment: server.port: 8080 spring.kafka.bootstrap-servers: kafka:9092 spring.kafka.producer.bootstrap-servers: kafka:9092 - spring.kafak.client-id: adder-2 - spring.kafka.auto-commit-interval: 1s - sumup.adder.throttle: 3ms - spring.data.mongodb.uri: mongodb://juplo:training@mongo:27017 - spring.data.mongodb.database: juplo + spring.kafak.client-id: supersimple logging.level.org.apache.kafka.clients.consumer: INFO - - peter: - image: juplo/toolbox - command: > - bash -c " - while [[ true ]]; - do - echo 666 | http -v gateway:8080/peter; - sleep 1; - done - " - klaus: - image: juplo/toolbox - command: > - bash -c " - while [[ true ]]; - do - echo 666 | http -v gateway:8080/klaus; - sleep 1; - done - " - - dlt: - image: juplo/toolbox - tty: true - command: kafkacat -C -b kafka:9092 -t out.DLT -f'p=%p|o=%o|%k=%s\n' -o 0 -q diff --git a/pom.xml b/pom.xml index a252d1c..9fa4884 100644 --- a/pom.xml +++ b/pom.xml @@ -12,10 +12,10 @@ de.juplo.kafka - sumup-adder-springified + supersimple 1.0-SNAPSHOT - SumUp Adder - Calculates the sum for the send messages. This version consumes JSON-messages. + Supersimple Consumer-Group + Most minimal Consumer-Group ever! 11 @@ -26,14 +26,6 @@ org.springframework.boot spring-boot-starter-web - - org.springframework.boot - spring-boot-starter-data-mongodb - - - org.springframework.boot - spring-boot-starter-validation - org.springframework.boot spring-boot-starter-actuator @@ -66,11 +58,6 @@ awaitility test - - de.flapdoodle.embed - de.flapdoodle.embed.mongo - test - org.assertj assertj-core diff --git a/src/main/java/de/juplo/kafka/AdderBusinessLogic.java b/src/main/java/de/juplo/kafka/AdderBusinessLogic.java deleted file mode 100644 index d525182..0000000 --- a/src/main/java/de/juplo/kafka/AdderBusinessLogic.java +++ /dev/null @@ -1,55 +0,0 @@ -package de.juplo.kafka; - - -import java.util.HashMap; -import java.util.Map; -import java.util.Optional; - - -public class AdderBusinessLogic -{ - private final Map state; - - - public AdderBusinessLogic() - { - this(new HashMap<>()); - } - - public AdderBusinessLogic(Map state) - { - this.state = state; - } - - - public synchronized Optional getSum(String user) - { - return Optional.ofNullable(state.get(user)).map(result -> result.sum); - } - - public synchronized void addToSum(String user, Integer value) - { - if (value == null || value < 1) - throw new IllegalArgumentException("Not a positive number: " + value); - - long sum = - Optional - .ofNullable(state.get(user)) - .map(result -> result.sum) - .orElse(0l); - state.put(user, new AdderResult(value, sum + value)); - } - - public synchronized AdderResult calculate(String user) - { - if (!state.containsKey(user)) - throw new IllegalStateException("No sumation for " + user + " in progress"); - - return state.remove(user); - } - - protected Map getState() - { - return state; - } -} diff --git a/src/main/java/de/juplo/kafka/AdderResult.java b/src/main/java/de/juplo/kafka/AdderResult.java deleted file mode 100644 index 44b7da8..0000000 --- a/src/main/java/de/juplo/kafka/AdderResult.java +++ /dev/null @@ -1,21 +0,0 @@ -package de.juplo.kafka; - -import lombok.EqualsAndHashCode; -import lombok.Getter; -import lombok.RequiredArgsConstructor; - - -@RequiredArgsConstructor -@Getter -@EqualsAndHashCode -public class AdderResult -{ - final int number; - final long sum; - - @Override - public String toString() - { - return "sum(" + number + ") = " + sum; - } -} diff --git a/src/main/java/de/juplo/kafka/AdderResults.java b/src/main/java/de/juplo/kafka/AdderResults.java deleted file mode 100644 index e7f5602..0000000 --- a/src/main/java/de/juplo/kafka/AdderResults.java +++ /dev/null @@ -1,47 +0,0 @@ -package de.juplo.kafka; - -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; - - -public class AdderResults -{ - private final Map>> results = new HashMap<>(); - - - public void addResults(Integer partition, String user, AdderResult result) - { - Map> resultsByUser = this.results.get(partition); - - List results = resultsByUser.get(user); - if (results == null) - { - results = new LinkedList<>(); - resultsByUser.put(user, results); - } - - results.add(result); - } - - protected void addPartition(Integer partition, Map> results) - { - this.results.put(partition, results); - } - - protected Map> removePartition(Integer partition) - { - return this.results.remove(partition); - } - - public Map>> getState() - { - return results; - } - - public Map> getState(Integer partition) - { - return results.get(partition); - } -} diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java index 69a9712..0f9ea12 100644 --- a/src/main/java/de/juplo/kafka/Application.java +++ b/src/main/java/de/juplo/kafka/Application.java @@ -1,115 +1,22 @@ package de.juplo.kafka; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.common.serialization.ByteArraySerializer; -import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.boot.autoconfigure.kafka.KafkaProperties; -import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.context.annotation.Bean; import org.springframework.kafka.annotation.EnableKafka; -import org.springframework.kafka.config.KafkaListenerEndpointRegistry; -import org.springframework.kafka.core.DefaultKafkaProducerFactory; -import org.springframework.kafka.core.KafkaOperations; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.kafka.core.ProducerFactory; -import org.springframework.kafka.listener.DeadLetterPublishingRecoverer; -import org.springframework.kafka.listener.DefaultErrorHandler; -import org.springframework.kafka.support.serializer.DelegatingByTypeSerializer; -import org.springframework.kafka.support.serializer.JsonSerializer; -import org.springframework.util.backoff.FixedBackOff; - -import java.util.Map; -import java.util.Optional; - +import org.springframework.kafka.annotation.KafkaListener; @SpringBootApplication -@Slf4j -@EnableConfigurationProperties({ KafkaProperties.class, ApplicationProperties.class }) @EnableKafka +@Slf4j public class Application { - @Bean - public ApplicationRecordHandler applicationRecordHandler( - AdderResults adderResults, - KafkaProperties kafkaProperties, - ApplicationProperties applicationProperties) - { - return new ApplicationRecordHandler( - adderResults, - Optional.ofNullable(applicationProperties.getThrottle()), - kafkaProperties.getConsumer().getGroupId()); - } - - @Bean - public AdderResults adderResults() - { - return new AdderResults(); - } - - @Bean - public ApplicationRebalanceListener rebalanceListener( - ApplicationRecordHandler recordHandler, - AdderResults adderResults, - StateRepository stateRepository, - KafkaProperties kafkaProperties) - { - return new ApplicationRebalanceListener( - recordHandler, - adderResults, - stateRepository, - kafkaProperties.getConsumer().getGroupId()); - } - - @Bean - ApplicationHealthIndicator applicationHealthIndicator( - KafkaListenerEndpointRegistry registry, - KafkaProperties properties) + @KafkaListener(id = "supersimple", topics = "out") + public void recieve(String message) { - return new ApplicationHealthIndicator( - properties.getConsumer().getGroupId(), - registry); + log.info("Recieved message: {}", message); } - @Bean - public ProducerFactory producerFactory( - KafkaProperties properties) - { - return new DefaultKafkaProducerFactory<>( - properties.getProducer().buildProperties(), - new StringSerializer(), - new DelegatingByTypeSerializer( - Map.of( - byte[].class, new ByteArraySerializer(), - MessageAddNumber.class, new JsonSerializer<>(), - MessageCalculateSum.class, new JsonSerializer<>()))); - } - - @Bean - public KafkaTemplate kafkaTemplate( - ProducerFactory producerFactory) - { - return new KafkaTemplate<>(producerFactory); - } - - @Bean - public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer( - KafkaOperations kafkaTemplate) - { - return new DeadLetterPublishingRecoverer(kafkaTemplate); - } - - @Bean - public DefaultErrorHandler errorHandler( - DeadLetterPublishingRecoverer recoverer) - { - return new DefaultErrorHandler( - recoverer, - new FixedBackOff(0l, 0l)); - } - - public static void main(String[] args) { SpringApplication.run(Application.class, args); diff --git a/src/main/java/de/juplo/kafka/ApplicationController.java b/src/main/java/de/juplo/kafka/ApplicationController.java deleted file mode 100644 index 0a9890c..0000000 --- a/src/main/java/de/juplo/kafka/ApplicationController.java +++ /dev/null @@ -1,39 +0,0 @@ -package de.juplo.kafka; - -import lombok.RequiredArgsConstructor; -import org.springframework.data.mongodb.core.aggregation.ArithmeticOperators; -import org.springframework.http.ResponseEntity; -import org.springframework.web.bind.annotation.*; - -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.stream.Collectors; - - -@RestController -@RequiredArgsConstructor -public class ApplicationController -{ - private final AdderResults results; - - - @GetMapping("results") - public Map>> results() - { - return results.getState(); - } - - @GetMapping("results/{user}") - public ResponseEntity> results(@PathVariable String user) - { - for (Map> resultsByUser : this.results.getState().values()) - { - List results = resultsByUser.get(user); - if (results != null) - return ResponseEntity.ok(results); - } - - return ResponseEntity.notFound().build(); - } -} diff --git a/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java b/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java deleted file mode 100644 index 0466df4..0000000 --- a/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java +++ /dev/null @@ -1,23 +0,0 @@ -package de.juplo.kafka; - -import lombok.RequiredArgsConstructor; -import org.springframework.boot.actuate.health.Health; -import org.springframework.boot.actuate.health.HealthIndicator; -import org.springframework.kafka.config.KafkaListenerEndpointRegistry; - - -@RequiredArgsConstructor -public class ApplicationHealthIndicator implements HealthIndicator -{ - private final String id; - private final KafkaListenerEndpointRegistry registry; - - - @Override - public Health health() - { - return registry.getListenerContainer(id).isRunning() - ? Health.up().build() - : Health.down().build(); - } -} diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java deleted file mode 100644 index 005460c..0000000 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ /dev/null @@ -1,23 +0,0 @@ -package de.juplo.kafka; - -import lombok.Getter; -import lombok.Setter; -import org.springframework.boot.context.properties.ConfigurationProperties; -import org.springframework.validation.annotation.Validated; - -import javax.validation.constraints.NotEmpty; -import javax.validation.constraints.NotNull; -import java.time.Duration; - - -@ConfigurationProperties(prefix = "sumup.adder") -@Validated -@Getter -@Setter -public class ApplicationProperties -{ - @NotNull - @NotEmpty - private String topic; - private Duration throttle; -} diff --git a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java deleted file mode 100644 index ba15227..0000000 --- a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java +++ /dev/null @@ -1,71 +0,0 @@ -package de.juplo.kafka; - -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; -import org.apache.kafka.common.TopicPartition; -import org.springframework.kafka.listener.ConsumerAwareRebalanceListener; - -import java.util.*; - - -@RequiredArgsConstructor -@Slf4j -public class ApplicationRebalanceListener implements ConsumerAwareRebalanceListener -{ - private final ApplicationRecordHandler recordHandler; - private final AdderResults adderResults; - private final StateRepository stateRepository; - private final String id; - - private final Set partitions = new HashSet<>(); - - @Override - public void onPartitionsAssigned(Collection partitions) - { - partitions.forEach(tp -> - { - Integer partition = tp.partition(); - log.info("{} - adding partition: {}", id, partition); - this.partitions.add(partition); - StateDocument document = - stateRepository - .findById(Integer.toString(partition)) - .orElse(new StateDocument(partition)); - recordHandler.addPartition(partition, document.state); - for (String user : document.state.keySet()) - { - log.info( - "{} - Restored state for partition={}|user={}: {}", - id, - partition, - user, - document.state.get(user)); - } - adderResults.addPartition(partition, document.results); - }); - } - - @Override - public void onPartitionsRevoked(Collection partitions) - { - partitions.forEach(tp -> - { - Integer partition = tp.partition(); - log.info("{} - removing partition: {}", id, partition); - this.partitions.remove(partition); - Map state = recordHandler.removePartition(partition); - for (String user : state.keySet()) - { - log.info( - "{} - Saved state for partition={}|user={}: {}", - id, - partition, - user, - state.get(user)); - } - Map> results = adderResults.removePartition(partition); - stateRepository.save(new StateDocument(partition, state, results)); - }); - } -} diff --git a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java deleted file mode 100644 index 2075781..0000000 --- a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java +++ /dev/null @@ -1,84 +0,0 @@ -package de.juplo.kafka; - -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.springframework.kafka.annotation.KafkaHandler; -import org.springframework.kafka.annotation.KafkaListener; -import org.springframework.kafka.support.KafkaHeaders; -import org.springframework.messaging.handler.annotation.Header; -import org.springframework.messaging.handler.annotation.Payload; - -import java.time.Duration; -import java.util.HashMap; -import java.util.Map; -import java.util.Optional; - - -@RequiredArgsConstructor -@Slf4j -@KafkaListener( - id = "${spring.kafka.consumer.group-id}", - topics = "${sumup.adder.topic}") -public class ApplicationRecordHandler -{ - private final AdderResults results; - private final Optional throttle; - private final String id; - - private final Map state = new HashMap<>(); - - - @KafkaHandler - public void addNumber( - @Header(KafkaHeaders.RECEIVED_PARTITION_ID) - Integer partition, - @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) - String user, - @Payload - MessageAddNumber message) - { - log.debug("{} - Received {} for {} on {}", id, message, user, partition); - state.get(partition).addToSum(user, message.getNext()); - throttle(); - } - - @KafkaHandler - public void calculateSum( - @Header(KafkaHeaders.RECEIVED_PARTITION_ID) - Integer partition, - @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) - String user, - @Payload - MessageCalculateSum message) - { - AdderResult result = state.get(partition).calculate(user); - log.info("{} - New result for {}: {}", id, user, result); - results.addResults(partition, user, result); - throttle(); - } - - private void throttle() - { - if (throttle.isPresent()) - { - try - { - Thread.sleep(throttle.get().toMillis()); - } - catch (InterruptedException e) - { - log.warn("{} - Intrerrupted while throttling: {}", id, e); - } - } - } - - protected void addPartition(Integer partition, Map state) - { - this.state.put(partition, new AdderBusinessLogic(state)); - } - - protected Map removePartition(Integer partition) - { - return this.state.remove(partition).getState(); - } -} diff --git a/src/main/java/de/juplo/kafka/Message.java b/src/main/java/de/juplo/kafka/Message.java deleted file mode 100644 index e4999b7..0000000 --- a/src/main/java/de/juplo/kafka/Message.java +++ /dev/null @@ -1,9 +0,0 @@ -package de.juplo.kafka; - - -public abstract class Message -{ - public enum Type {ADD, CALC} - - public abstract Type getType(); -} diff --git a/src/main/java/de/juplo/kafka/MessageAddNumber.java b/src/main/java/de/juplo/kafka/MessageAddNumber.java deleted file mode 100644 index c024b65..0000000 --- a/src/main/java/de/juplo/kafka/MessageAddNumber.java +++ /dev/null @@ -1,19 +0,0 @@ -package de.juplo.kafka; - -import com.fasterxml.jackson.annotation.JsonIgnoreProperties; -import lombok.Data; - - -@Data -@JsonIgnoreProperties(ignoreUnknown = true) -public class MessageAddNumber extends Message -{ - private Integer next; - - - @Override - public Type getType() - { - return Type.ADD; - } -} diff --git a/src/main/java/de/juplo/kafka/MessageCalculateSum.java b/src/main/java/de/juplo/kafka/MessageCalculateSum.java deleted file mode 100644 index afc5a39..0000000 --- a/src/main/java/de/juplo/kafka/MessageCalculateSum.java +++ /dev/null @@ -1,16 +0,0 @@ -package de.juplo.kafka; - -import com.fasterxml.jackson.annotation.JsonIgnoreProperties; -import lombok.Data; - - -@Data -@JsonIgnoreProperties(ignoreUnknown = true) -public class MessageCalculateSum extends Message -{ - @Override - public Type getType() - { - return Type.CALC; - } -} diff --git a/src/main/java/de/juplo/kafka/StateDocument.java b/src/main/java/de/juplo/kafka/StateDocument.java deleted file mode 100644 index ae8eb51..0000000 --- a/src/main/java/de/juplo/kafka/StateDocument.java +++ /dev/null @@ -1,41 +0,0 @@ -package de.juplo.kafka; - -import lombok.ToString; -import org.springframework.data.annotation.Id; -import org.springframework.data.mongodb.core.mapping.Document; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; - - -@Document(collection = "state") -@ToString -public class StateDocument -{ - @Id - public String id; - public Map state; - public Map> results; - - public StateDocument() - { - } - - public StateDocument(Integer partition) - { - this.id = Integer.toString(partition); - this.state = new HashMap<>(); - this.results = new HashMap<>(); - } - - public StateDocument( - Integer partition, - Map state, - Map> results) - { - this.id = Integer.toString(partition); - this.state = state; - this.results = results; - } -} diff --git a/src/main/java/de/juplo/kafka/StateRepository.java b/src/main/java/de/juplo/kafka/StateRepository.java deleted file mode 100644 index 3129535..0000000 --- a/src/main/java/de/juplo/kafka/StateRepository.java +++ /dev/null @@ -1,11 +0,0 @@ -package de.juplo.kafka; - -import org.springframework.data.mongodb.repository.MongoRepository; - -import java.util.Optional; - - -public interface StateRepository extends MongoRepository -{ - public Optional findById(String partition); -} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index a95e976..ee1bb64 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -1,6 +1,3 @@ -sumup: - adder: - topic: out management: endpoint: shutdown: @@ -15,25 +12,8 @@ management: java: enabled: true spring: - data: - mongodb: - uri: mongodb://juplo:training@localhost:27017 - database: juplo kafka: bootstrap-servers: :9092 - consumer: - group-id: my-group - key-deserializer: org.apache.kafka.common.serialization.StringDeserializer - value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer - properties: - partition.assignment.strategy: org.apache.kafka.clients.consumer.StickyAssignor - metadata.max.age.ms: 1000 - spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer - spring.json.type.mapping: > - ADD:de.juplo.kafka.MessageAddNumber, - CALC:de.juplo.kafka.MessageCalculateSum - producer: - bootstrap-servers: :9092 logging: level: root: INFO diff --git a/src/test/java/de/juplo/kafka/AdderBusinessLogicTest.java b/src/test/java/de/juplo/kafka/AdderBusinessLogicTest.java deleted file mode 100644 index 8e49263..0000000 --- a/src/test/java/de/juplo/kafka/AdderBusinessLogicTest.java +++ /dev/null @@ -1,117 +0,0 @@ -package de.juplo.kafka; - -import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.Arguments; -import org.junit.jupiter.params.provider.MethodSource; -import org.junit.jupiter.params.provider.ValueSource; - -import java.util.Arrays; -import java.util.stream.IntStream; -import java.util.stream.Stream; - -import static org.assertj.core.api.Assertions.*; - - -public class AdderBusinessLogicTest -{ - @Test - @DisplayName("An empty Optional should be returned, for a non-existing sum") - public void testGetSumReturnsEmptyOptionalForNonExistingSum() - { - AdderBusinessLogic adder = new AdderBusinessLogic(); - assertThat(adder.getSum("foo")).isEmpty(); - } - - @Test - @DisplayName("A non-empty Optional should be returned, for an existing sum") - public void testGetSumReturnsNonEmptyOptionalForExistingSum() - { - AdderBusinessLogic adder = new AdderBusinessLogic(); - adder.addToSum("foo", 6); - assertThat(adder.getSum("foo")).isNotEmpty(); - } - - @Test - @DisplayName("A sum can be calculated, if it does exist") - public void testCalculatePossibleIfSumExists() - { - AdderBusinessLogic adder = new AdderBusinessLogic(); - adder.addToSum("foo", 6); - assertThatNoException().isThrownBy(() -> adder.calculate("foo")); - } - - @Test - @DisplayName("An existing sum is removed, if ended") - public void testCalculateRemovesSumIfSumExists() - { - AdderBusinessLogic adder = new AdderBusinessLogic(); - adder.addToSum("foo", 6); - adder.calculate("foo"); - assertThat(adder.getSum("foo")).isEmpty(); - } - - @Test - @DisplayName("An existing sum returns a non-null value, if calculated") - public void testCalculateReturnsNonNullValueIfSumExists() - { - AdderBusinessLogic adder = new AdderBusinessLogic(); - adder.addToSum("foo", 6); - assertThat(adder.calculate("foo")).isNotNull(); - } - - @Test - @DisplayName("Ending a non-existing sum, causes an IllegalStateException") - public void testCalculateCausesExceptionIfNotExists() - { - AdderBusinessLogic adder = new AdderBusinessLogic(); - assertThatIllegalStateException().isThrownBy(() -> adder.calculate("foo")); - } - - @Test - @DisplayName("Adding a null-value to a sum causes an IllegalArgumentException") - public void testAddToSumWithNullValueCausesException() - { - AdderBusinessLogic adder = new AdderBusinessLogic(); - assertThatIllegalArgumentException().isThrownBy(() -> adder.addToSum("foo", null)); - } - - @ParameterizedTest(name = "{index}: Adding {0}") - @DisplayName("Adding a non-positive value to a sum causes an IllegalArgumentException") - @ValueSource(ints = { 0, -1, -6, -66, Integer.MIN_VALUE }) - public void testAddToSumWithNonPositiveValueCausesException(int value) - { - AdderBusinessLogic adder = new AdderBusinessLogic(); - assertThatIllegalArgumentException().isThrownBy(() -> adder.addToSum("foo", value)); - } - - @ParameterizedTest(name = "{index}: Adding {0}") - @DisplayName("Can add a positive value to a sum") - @ValueSource(ints = { 1, 3, 6, 66, 7, 9 }) - public void testAddToSumWithPositiveValuePossible(int value) - { - AdderBusinessLogic adder = new AdderBusinessLogic(); - assertThatNoException().isThrownBy(() -> adder.addToSum("foo", value)); - } - - @ParameterizedTest(name = "{index}: Summing up {0}") - @DisplayName("Adds up numbers correctly") - @MethodSource("numbersProvider") - public void testAddToSumAddsUpNumbersCorrectlyIfSumExists(int... numbers) - { - long expectedResult = Arrays.stream(numbers).sum(); - AdderBusinessLogic adder = new AdderBusinessLogic(); - Arrays.stream(numbers).forEach(number -> adder.addToSum("foo", number)); - AdderResult result = adder.calculate("foo"); - assertThat(result.number).isEqualTo(numbers[numbers.length-1]); - assertThat(result.sum).isEqualTo(expectedResult); - } - - static Stream numbersProvider() { - return Stream.of( - Arguments.of((Object) IntStream.rangeClosed(1,9).toArray()), - Arguments.of((Object) IntStream.rangeClosed(1,19).toArray()), - Arguments.of((Object) IntStream.rangeClosed(1,66).toArray())); - } -} diff --git a/src/test/java/de/juplo/kafka/ApplicationIT.java b/src/test/java/de/juplo/kafka/ApplicationTests.java similarity index 62% rename from src/test/java/de/juplo/kafka/ApplicationIT.java rename to src/test/java/de/juplo/kafka/ApplicationTests.java index 4bb4f5b..3c77a44 100644 --- a/src/test/java/de/juplo/kafka/ApplicationIT.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -2,27 +2,18 @@ package de.juplo.kafka; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.web.client.TestRestTemplate; import org.springframework.boot.test.web.server.LocalServerPort; import org.springframework.kafka.test.context.EmbeddedKafka; -import static de.juplo.kafka.ApplicationIT.TOPIC; - @SpringBootTest( webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, - properties = { - "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", - "sumup.adder.topic=" + TOPIC, - "spring.mongodb.embedded.version=4.4.13" }) -@EmbeddedKafka(topics = TOPIC) -@AutoConfigureDataMongo -public class ApplicationIT + properties = "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}") +@EmbeddedKafka(topics = "out") +public class ApplicationTests { - public static final String TOPIC = "FOO"; - @LocalServerPort private int port; diff --git a/src/test/java/de/juplo/kafka/MessageTest.java b/src/test/java/de/juplo/kafka/MessageTest.java deleted file mode 100644 index 52794ba..0000000 --- a/src/test/java/de/juplo/kafka/MessageTest.java +++ /dev/null @@ -1,39 +0,0 @@ -package de.juplo.kafka; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.Arguments; -import org.junit.jupiter.params.provider.MethodSource; -import org.junit.jupiter.params.provider.ValueSource; - -import java.util.Arrays; -import java.util.stream.IntStream; -import java.util.stream.Stream; - -import static org.assertj.core.api.Assertions.*; - - -public class MessageTest -{ - ObjectMapper mapper = new ObjectMapper(); - - @Test - @DisplayName("Deserialize a MessageAddNumber message") - public void testDeserializeMessageAddNumber() - { - Assertions.assertDoesNotThrow(() -> mapper.readValue("{\"next\":42}", MessageAddNumber.class)); - Assertions.assertDoesNotThrow(() -> mapper.readValue("{\"number\":666,\"next\":42}", MessageAddNumber.class)); - } - - @Test - @DisplayName("Deserialize a MessageCalculateSum message") - public void testDeserializeMessageCalculateSum() throws JsonProcessingException - { - Assertions.assertDoesNotThrow(() -> mapper.readValue("{}", MessageCalculateSum.class)); - Assertions.assertDoesNotThrow(() -> mapper.readValue("{\"number\":666}", MessageCalculateSum.class)); - } -} -- 2.20.1