From: Kai Moritz Date: Sun, 14 Aug 2022 18:52:49 +0000 (+0200) Subject: Verbesserte Tests und Korrekturen gemerged: sumup-adder -> stored-offsets X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=f1eec82fb197f9fc7906eb9a90d75468e9e4356f;hp=a2e8fc924e5b472d6b90c42d311514f91ea452f1;p=demos%2Fkafka%2Ftraining Verbesserte Tests und Korrekturen gemerged: sumup-adder -> stored-offsets --- diff --git a/README.sh b/README.sh index 2845ab1..133af42 100755 --- a/README.sh +++ b/README.sh @@ -1,6 +1,6 @@ #!/bin/bash -IMAGE=juplo/sumup-adder:1.0-SNAPSHOT +IMAGE=juplo/endless-consumer:1.0-SNAPSHOT if [ "$1" = "cleanup" ] then @@ -9,14 +9,14 @@ then exit fi -docker-compose up -d zookeeper kafka-1 kafka-2 kafka-3 cli mongo express +docker-compose up -d zookeeper kafka cli mongo express if [[ $(docker image ls -q $IMAGE) == "" || "$1" = "build" ]] then - docker-compose rm -svf adder + docker-compose rm -svf peter beate mvn clean install || exit else echo "Using image existing images:" @@ -26,19 +26,16 @@ fi echo "Waiting for the Kafka-Cluster to become ready..." docker-compose exec cli cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1 docker-compose up setup -docker-compose up -d gateway requests adder +docker-compose up -d producer peter beate -while ! [[ $(http 0:8080/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for gateway..."; sleep 1; done -while ! [[ $(http 0:8081/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for requests..."; sleep 1; done -while ! [[ $(http 0:8082/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for adder..."; sleep 1; done +sleep 15 -echo 66 | http -v :8080/foo -echo 666 | http -v :8080/bar +http -v post :8082/stop +sleep 10 +docker-compose kill -s 9 peter +http -v post :8082/start +sleep 60 -sleep 5 - -http -v :8082/state -http -v :8082/state/foo -http -v :8082/state/bar - -docker-compose logs adder +docker-compose stop producer peter beate +docker-compose logs beate +docker-compose logs --tail=10 peter diff --git a/docker-compose.yml b/docker-compose.yml index fec5bca..7ab77b2 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -7,56 +7,20 @@ services: ports: - 2181:2181 - kafka-1: + kafka: image: confluentinc/cp-kafka:7.1.3 environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 - KAFKA_LISTENERS: DOCKER://:9092, LOCALHOST://:9081 - KAFKA_ADVERTISED_LISTENERS: DOCKER://kafka-1:9092, LOCALHOST://localhost:9081 - KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER:PLAINTEXT, LOCALHOST:PLAINTEXT - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 - KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false" - ports: - - 9081:9081 - depends_on: - - zookeeper - - kafka-2: - image: confluentinc/cp-kafka:7.1.3 - environment: - KAFKA_BROKER_ID: 2 - KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_LISTENERS: DOCKER://:9092, LOCALHOST://:9082 - KAFKA_ADVERTISED_LISTENERS: DOCKER://kafka-2:9092, LOCALHOST://localhost:9082 + KAFKA_ADVERTISED_LISTENERS: DOCKER://kafka:9092, LOCALHOST://localhost:9082 KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER:PLAINTEXT, LOCALHOST:PLAINTEXT - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false" ports: - 9092:9082 - 9082:9082 - networks: - default: - aliases: - - kafka - depends_on: - - zookeeper - - kafka-3: - image: confluentinc/cp-kafka:7.1.3 - environment: - KAFKA_BROKER_ID: 3 - KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 - KAFKA_LISTENERS: DOCKER://:9092, LOCALHOST://:9083 - KAFKA_ADVERTISED_LISTENERS: DOCKER://kafka-3:9092, LOCALHOST://localhost:9083 - KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER:PLAINTEXT, LOCALHOST:PLAINTEXT - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 - KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false" - ports: - - 9083:9083 depends_on: - zookeeper @@ -83,44 +47,46 @@ services: image: juplo/toolbox command: > bash -c " - kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic in - kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic out - kafka-topics --bootstrap-server kafka:9092 --create --topic in --partitions 2 --replication-factor 3 --config min.insync.replicas=2 - kafka-topics --bootstrap-server kafka:9092 --create --topic out --partitions 1 --replication-factor 1 - kafka-topics --bootstrap-server kafka:9092 --describe --topic in - kafka-topics --bootstrap-server kafka:9092 --describe --topic out + kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic test + kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 2 " cli: image: juplo/toolbox command: sleep infinity - gateway: - image: juplo/sumup-gateway:1.0-SNAPSHOT + producer: + image: juplo/endless-long-producer:1.0-SNAPSHOT ports: - 8080:8080 environment: server.port: 8080 - sumup.gateway.bootstrap-server: kafka:9092 - sumup.gateway.client-id: gateway - sumup.gateway.topic: in + producer.bootstrap-server: kafka:9092 + producer.client-id: producer + producer.topic: test + producer.throttle-ms: 500 + - requests: - image: juplo/sumup-requests:1.0-SNAPSHOT + peter: + image: juplo/endless-consumer:1.0-SNAPSHOT ports: - 8081:8080 environment: server.port: 8080 - sumup.requests.bootstrap-server: kafka:9092 - sumup.requests.client-id: requests + consumer.bootstrap-server: kafka:9092 + consumer.client-id: peter + consumer.topic: test + spring.data.mongodb.uri: mongodb://juplo:training@mongo:27017 + spring.data.mongodb.database: juplo - adder: - image: juplo/sumup-adder:1.0-SNAPSHOT + beate: + image: juplo/endless-consumer:1.0-SNAPSHOT ports: - 8082:8080 environment: server.port: 8080 - sumup.adder.bootstrap-server: kafka:9092 - sumup.adder.client-id: adder + consumer.bootstrap-server: kafka:9092 + consumer.client-id: beate + consumer.topic: test spring.data.mongodb.uri: mongodb://juplo:training@mongo:27017 spring.data.mongodb.database: juplo diff --git a/pom.xml b/pom.xml index ecb559a..fa78c70 100644 --- a/pom.xml +++ b/pom.xml @@ -12,10 +12,9 @@ de.juplo.kafka - sumup-adder + endless-consumer 1.0-SNAPSHOT - SumUp Adder - Calculates the sum for the send messages + Endless Consumer: a Simple Consumer-Group that reads and prints the topic and counts the received messages for each key by topic 11 @@ -71,11 +70,6 @@ de.flapdoodle.embed.mongo test - - org.assertj - assertj-core - test - diff --git a/src/main/java/de/juplo/kafka/AdderBusinessLogic.java b/src/main/java/de/juplo/kafka/AdderBusinessLogic.java deleted file mode 100644 index 1f3d9aa..0000000 --- a/src/main/java/de/juplo/kafka/AdderBusinessLogic.java +++ /dev/null @@ -1,61 +0,0 @@ -package de.juplo.kafka; - - -import java.util.HashMap; -import java.util.Map; -import java.util.Optional; - - -public class AdderBusinessLogic -{ - private final Map state; - - - public AdderBusinessLogic() - { - this(new HashMap<>()); - } - - public AdderBusinessLogic(Map state) - { - this.state = state; - } - - - public synchronized void startSum(String user) - { - if (state.containsKey(user)) - throw new IllegalStateException("Sumation for " + user + " already in progress, state: " + state.get(user)); - - state.put(user, 0l); - } - - public synchronized Optional getSum(String user) - { - return Optional.ofNullable(state.get(user)); - } - - public synchronized void addToSum(String user, Integer value) - { - if (!state.containsKey(user)) - throw new IllegalStateException("No sumation for " + user + " in progress"); - if (value == null || value < 1) - throw new IllegalArgumentException("Not a positive number: " + value); - - long result = state.get(user) + value; - state.put(user, result); - } - - public synchronized Long endSum(String user) - { - if (!state.containsKey(user)) - throw new IllegalStateException("No sumation for " + user + " in progress"); - - return state.remove(user); - } - - protected Map getState() - { - return state; - } -} diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 9f54083..a9d9b15 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -2,6 +2,7 @@ package de.juplo.kafka; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; @@ -27,7 +28,7 @@ public class ApplicationConfiguration public ApplicationRebalanceListener rebalanceListener( ApplicationRecordHandler recordHandler, StateRepository stateRepository, - Consumer consumer, + Consumer consumer, ApplicationProperties properties) { return new ApplicationRebalanceListener( @@ -41,8 +42,8 @@ public class ApplicationConfiguration } @Bean - public EndlessConsumer endlessConsumer( - KafkaConsumer kafkaConsumer, + public EndlessConsumer endlessConsumer( + KafkaConsumer kafkaConsumer, ExecutorService executor, ApplicationRebalanceListener rebalanceListener, ApplicationRecordHandler recordHandler, @@ -65,7 +66,7 @@ public class ApplicationConfiguration } @Bean(destroyMethod = "close") - public KafkaConsumer kafkaConsumer(ApplicationProperties properties) + public KafkaConsumer kafkaConsumer(ApplicationProperties properties) { Properties props = new Properties(); @@ -75,9 +76,10 @@ public class ApplicationConfiguration props.put("client.id", properties.getClientId()); props.put("enable.auto.commit", false); props.put("auto.offset.reset", properties.getAutoOffsetReset()); + props.put("auto.commit.interval.ms", (int)properties.getCommitInterval().toMillis()); props.put("metadata.max.age.ms", "1000"); props.put("key.deserializer", StringDeserializer.class.getName()); - props.put("value.deserializer", StringDeserializer.class.getName()); + props.put("value.deserializer", LongDeserializer.class.getName()); return new KafkaConsumer<>(props); } diff --git a/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java b/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java index df4e653..dc3a26e 100644 --- a/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java +++ b/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java @@ -10,7 +10,7 @@ import org.springframework.stereotype.Component; @RequiredArgsConstructor public class ApplicationHealthIndicator implements HealthIndicator { - private final EndlessConsumer consumer; + private final EndlessConsumer consumer; @Override diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java index 410c623..14e928f 100644 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java @@ -10,7 +10,7 @@ import javax.validation.constraints.NotNull; import java.time.Duration; -@ConfigurationProperties(prefix = "sumup.adder") +@ConfigurationProperties(prefix = "consumer") @Validated @Getter @Setter diff --git a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java index 542af2d..444b7b7 100644 --- a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java +++ b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java @@ -22,7 +22,7 @@ public class ApplicationRebalanceListener implements PollIntervalAwareConsumerRe private final String topic; private final Clock clock; private final Duration commitInterval; - private final Consumer consumer; + private final Consumer consumer; private Instant lastCommit = Instant.EPOCH; private boolean commitsEnabled = true; @@ -85,10 +85,10 @@ public class ApplicationRebalanceListener implements PollIntervalAwareConsumerRe if (lastCommit.plus(commitInterval).isBefore(clock.instant())) { log.debug("Storing data and offsets, last commit: {}", lastCommit); - recordHandler.getState().forEach((partiton, adder) -> stateRepository.save( + recordHandler.getState().forEach((partiton, state) -> stateRepository.save( new StateDocument( partiton, - adder.getState(), + state, consumer.position(new TopicPartition(topic, partiton))))); lastCommit = clock.instant(); } diff --git a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java index d0d385c..c2c2657 100644 --- a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java +++ b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java @@ -8,46 +8,38 @@ import java.util.Map; @Slf4j -public class ApplicationRecordHandler implements RecordHandler +public class ApplicationRecordHandler implements RecordHandler { - private final Map state = new HashMap<>(); + private final Map> state = new HashMap<>(); @Override - public void accept(ConsumerRecord record) + public void accept(ConsumerRecord record) { Integer partition = record.partition(); - String user = record.key(); - String message = record.value(); - switch (message) - { - case "START": - state.get(partition).startSum(user); - break; - - case "END": - Long result = state.get(partition).endSum(user); - log.info("New result for {}: {}", user, result); - break; - - default: - state.get(partition).addToSum(user, Integer.parseInt(message)); - break; - } + String key = record.key() == null ? "NULL" : record.key().toString(); + Map byKey = state.get(partition); + + if (!byKey.containsKey(key)) + byKey.put(key, 0l); + + long seenByKey = byKey.get(key); + seenByKey++; + byKey.put(key, seenByKey); } protected void addPartition(Integer partition, Map state) { - this.state.put(partition, new AdderBusinessLogic(state)); + this.state.put(partition, state); } protected Map removePartition(Integer partition) { - return this.state.remove(partition).getState(); + return this.state.remove(partition); } - public Map getState() + public Map> getState() { return state; } diff --git a/src/main/java/de/juplo/kafka/DriverController.java b/src/main/java/de/juplo/kafka/DriverController.java index d389271..09fb762 100644 --- a/src/main/java/de/juplo/kafka/DriverController.java +++ b/src/main/java/de/juplo/kafka/DriverController.java @@ -2,13 +2,10 @@ package de.juplo.kafka; import lombok.RequiredArgsConstructor; import org.springframework.http.HttpStatus; -import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.*; import java.util.Map; -import java.util.Optional; import java.util.concurrent.ExecutionException; -import java.util.stream.Collectors; @RestController @@ -35,27 +32,7 @@ public class DriverController @GetMapping("state") public Map> state() { - return - recordHandler - .getState() - .entrySet() - .stream() - .collect(Collectors.toMap( - entry -> entry.getKey(), - entry -> entry.getValue().getState())); - } - - @GetMapping("state/{user}") - public ResponseEntity seen(@PathVariable String user) - { - for (AdderBusinessLogic adder : recordHandler.getState().values()) - { - Optional sum = adder.getSum(user); - if (sum.isPresent()) - return ResponseEntity.ok(sum.get()); - } - - return ResponseEntity.notFound().build(); + return recordHandler.getState(); } diff --git a/src/main/java/de/juplo/kafka/StateDocument.java b/src/main/java/de/juplo/kafka/StateDocument.java index 0540e3f..bb1c701 100644 --- a/src/main/java/de/juplo/kafka/StateDocument.java +++ b/src/main/java/de/juplo/kafka/StateDocument.java @@ -27,10 +27,7 @@ public class StateDocument this.state = new HashMap<>(); } - public StateDocument( - Integer partition, - Map state, - long offset) + public StateDocument(Integer partition, Map state, long offset) { this.id = Integer.toString(partition); this.state = state; diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 26948f5..fc1c68a 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -1,11 +1,10 @@ -sumup: - adder: - bootstrap-server: :9092 - group-id: my-group - client-id: DEV - topic: out - auto-offset-reset: earliest - commit-interval: 5s +consumer: + bootstrap-server: :9092 + group-id: my-group + client-id: DEV + topic: test + auto-offset-reset: earliest + commit-interval: 5s management: endpoint: shutdown: diff --git a/src/test/java/de/juplo/kafka/AdderBusinessLogicTest.java b/src/test/java/de/juplo/kafka/AdderBusinessLogicTest.java deleted file mode 100644 index 435f036..0000000 --- a/src/test/java/de/juplo/kafka/AdderBusinessLogicTest.java +++ /dev/null @@ -1,152 +0,0 @@ -package de.juplo.kafka; - -import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.Arguments; -import org.junit.jupiter.params.provider.MethodSource; -import org.junit.jupiter.params.provider.ValueSource; - -import java.util.Arrays; -import java.util.stream.IntStream; -import java.util.stream.Stream; - -import static org.assertj.core.api.Assertions.*; - - -public class AdderBusinessLogicTest -{ - @Test - @DisplayName("A new sum can be started, if it does not exist") - public void testStartSumPossibleIfNotExists() - { - AdderBusinessLogic adder = new AdderBusinessLogic(); - assertThatNoException().isThrownBy(() -> adder.startSum("foo")); - } - - @Test - @DisplayName("Starting an already existing sum again, causes an IllegalStateException") - public void testStartSumCausesExceptionIfExists() - { - AdderBusinessLogic adder = new AdderBusinessLogic(); - adder.startSum("foo"); - assertThatIllegalStateException().isThrownBy(() -> adder.startSum("foo")); - } - - @Test - @DisplayName("An empty Optional should be returned, for a non-existing sum") - public void testGetSumReturnsEmptyOptionalForNonExistingSum() - { - AdderBusinessLogic adder = new AdderBusinessLogic(); - assertThat(adder.getSum("foo")).isEmpty(); - } - - @Test - @DisplayName("A non-empty Optional should be returned, for an existing sum") - public void testGetSumReturnsNonEmptyOptionalForExistingSum() - { - AdderBusinessLogic adder = new AdderBusinessLogic(); - adder.startSum("foo"); - assertThat(adder.getSum("foo")).isNotEmpty(); - } - - @Test - @DisplayName("A sum can be ended, if it does exist") - public void testEndSumPossibleIfSumExists() - { - AdderBusinessLogic adder = new AdderBusinessLogic(); - adder.startSum("foo"); - assertThatNoException().isThrownBy(() -> adder.endSum("foo")); - } - - @Test - @DisplayName("An existing sum is removed, if ended") - public void testEndSumRemovesSumIfSumExists() - { - AdderBusinessLogic adder = new AdderBusinessLogic(); - adder.startSum("foo"); - adder.endSum("foo"); - assertThat(adder.getSum("foo")).isEmpty(); - } - - @Test - @DisplayName("An existing Sum returns a non-null value, if ended") - public void testEndSumReturnsNonNullValueIfSumExists() - { - AdderBusinessLogic adder = new AdderBusinessLogic(); - adder.startSum("foo"); - assertThat(adder.endSum("foo")).isNotNull(); - } - - @Test - @DisplayName("An existing Sum returns a non-negative value, if ended") - public void testEndSumReturnsNonNegativeValueIfSumExists() - { - AdderBusinessLogic adder = new AdderBusinessLogic(); - adder.startSum("foo"); - assertThat(adder.endSum("foo")).isNotNegative(); - } - - @Test - @DisplayName("Ending a non-existing sum, causes an IllegalStateException") - public void testEndSumCausesExceptionIfNotExists() - { - AdderBusinessLogic adder = new AdderBusinessLogic(); - assertThatIllegalStateException().isThrownBy(() -> adder.endSum("foo")); - } - - @Test - @DisplayName("Adding to a non-existent sum causes an IllegalStateException") - public void testAddToSumCausesExceptionIfNotExists() - { - AdderBusinessLogic adder = new AdderBusinessLogic(); - assertThatIllegalStateException().isThrownBy(() -> adder.addToSum("foo", 1)); - } - - @Test - @DisplayName("Adding a null-value to an existing sum causes an IllegalArgumentException") - public void testAddSumWithNullValueToExistingSumCausesException() - { - AdderBusinessLogic adder = new AdderBusinessLogic(); - adder.startSum("foo"); - assertThatIllegalArgumentException().isThrownBy(() -> adder.addToSum("foo", null)); - } - - @ParameterizedTest(name = "{index}: Adding {0}") - @DisplayName("Adding a non-positive value to an existing sum causes an IllegalArgumentException") - @ValueSource(ints = { 0, -1, -6, -66, Integer.MIN_VALUE }) - public void testAddSumWithNonPositiveValueToExistingSumCausesException(int value) - { - AdderBusinessLogic adder = new AdderBusinessLogic(); - adder.startSum("foo"); - assertThatIllegalArgumentException().isThrownBy(() -> adder.addToSum("foo", value)); - } - - @Test - @DisplayName("Can add a positive value to an existing sum") - public void testAddSumWithPositiveValuePossibleIfSumExists() - { - AdderBusinessLogic adder = new AdderBusinessLogic(); - adder.startSum("foo"); - assertThatIllegalArgumentException().isThrownBy(() -> adder.addToSum("foo", -1)); - } - - @ParameterizedTest(name = "{index}: Summing up {0}") - @DisplayName("Adds up numbers correctly") - @MethodSource("numbersProvider") - public void testAddSumAddsUpNumbersCorrectlyIfSumExists(int... numbers) - { - long expectedResult = Arrays.stream(numbers).sum(); - AdderBusinessLogic adder = new AdderBusinessLogic(); - adder.startSum("foo"); - Arrays.stream(numbers).forEach(number -> adder.addToSum("foo", number)); - assertThat(adder.endSum("foo")).isEqualTo(expectedResult); - } - - static Stream numbersProvider() { - return Stream.of( - Arguments.of((Object) IntStream.rangeClosed(1,9).toArray()), - Arguments.of((Object) IntStream.rangeClosed(1,19).toArray()), - Arguments.of((Object) IntStream.rangeClosed(1,66).toArray())); - } -} diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 4ddf8a9..5166227 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -1,31 +1,28 @@ package de.juplo.kafka; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Bytes; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Primary; +import org.springframework.test.context.ContextConfiguration; import java.util.function.Consumer; -import java.util.stream.IntStream; -public class ApplicationTests extends GenericApplicationTests +@ContextConfiguration(classes = ApplicationTests.Configuration.class) +public class ApplicationTests extends GenericApplicationTests { public ApplicationTests() { super( new RecordGenerator() { - final int[] numbers = { 1, 7, 3, 2, 33, 6, 11 }; - final String[] dieWilden13 = - IntStream - .range(1,14) - .mapToObj(i -> "seeräuber-" + i) - .toArray(i -> new String[i]); final StringSerializer stringSerializer = new StringSerializer(); - final Bytes startMessage = new Bytes(stringSerializer.serialize(TOPIC, "START")); - final Bytes endMessage = new Bytes(stringSerializer.serialize(TOPIC, "END")); - - int counter = 0; + final LongSerializer longSerializer = new LongSerializer(); @Override @@ -34,51 +31,62 @@ public class ApplicationTests extends GenericApplicationTests boolean logicErrors, Consumer> messageSender) { - counter = 0; + int i = 0; - for (int i = 0; i < 33; i++) + for (int partition = 0; partition < 10; partition++) { - String seeräuber = dieWilden13[i%13]; - int number = numbers[i%7]; - - Bytes key = new Bytes(stringSerializer.serialize(TOPIC, seeräuber)); - - send(key, startMessage, logicErrors, messageSender); - for (int message = 1; message <= number; message++) + for (int key = 0; key < 10; key++) { - Bytes value = new Bytes(stringSerializer.serialize(TOPIC, Integer.toString(message))); - send(key, value, logicErrors, messageSender); - } - send(key, endMessage, logicErrors, messageSender); - } + i++; - return counter; - } + Bytes value = new Bytes(longSerializer.serialize(TOPIC, (long)i)); + if (i == 77) + { + if (logicErrors) + { + value = new Bytes(longSerializer.serialize(TOPIC, Long.MIN_VALUE)); + } + if (poisonPills) + { + value = new Bytes(stringSerializer.serialize(TOPIC, "BOOM (Poison-Pill)!")); + } + } - void send( - Bytes key, - Bytes value, - boolean logicErrors, - Consumer> messageSender) - { - counter++; + ProducerRecord record = + new ProducerRecord<>( + TOPIC, + partition, + new Bytes(stringSerializer.serialize(TOPIC,Integer.toString(partition*10+key%2))), + value); - if (counter == 77) - { - if (logicErrors) - { - value = value.equals(startMessage) ? endMessage : startMessage; + messageSender.accept(record); } } - messageSender.accept(new ProducerRecord<>(TOPIC, key, value)); - } - - @Override - public boolean canGeneratePoisonPill() - { - return false; + return i; } }); } + + + @TestConfiguration + public static class Configuration + { + @Primary + @Bean + public ApplicationRecordHandler recordHandler() + { + ApplicationRecordHandler recordHandler = new ApplicationRecordHandler(); + return new ApplicationRecordHandler() + { + @Override + public void accept(ConsumerRecord record) + { + if (record.value() == Long.MIN_VALUE) + throw new RuntimeException("BOOM (Logic-Error)!"); + super.accept(record); + } + }; + } + } } diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java index 9a6f812..fa3d911 100644 --- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java +++ b/src/test/java/de/juplo/kafka/GenericApplicationTests.java @@ -37,9 +37,9 @@ import static org.awaitility.Awaitility.*; @SpringJUnitConfig(initializers = ConfigDataApplicationContextInitializer.class) @TestPropertySource( properties = { - "sumup.adder.bootstrap-server=${spring.embedded.kafka.brokers}", - "sumup.adder.topic=" + TOPIC, - "sumup.adder.commit-interval=1s", + "consumer.bootstrap-server=${spring.embedded.kafka.brokers}", + "consumer.topic=" + TOPIC, + "consumer.commit-interval=1s", "spring.mongodb.embedded.version=4.4.13" }) @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS) @EnableAutoConfiguration