From: Kai Moritz Date: Sat, 13 Aug 2022 17:13:22 +0000 (+0200) Subject: `requests`-Service für das SumUp-Beispiel implementiert X-Git-Tag: sumup-requests---lvm-2-tage~4 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=41e5f74b40e4a434483dcc4142aaf8224ea5a478;p=demos%2Fkafka%2Ftraining `requests`-Service für das SumUp-Beispiel implementiert --- diff --git a/README.sh b/README.sh index 900270a..9aa6ec2 100755 --- a/README.sh +++ b/README.sh @@ -1,6 +1,6 @@ #!/bin/bash -IMAGE=juplo/endless-consumer:1.0-SNAPSHOT +IMAGE=juplo/sumup-requests:1.0-SNAPSHOT if [ "$1" = "cleanup" ] then @@ -9,14 +9,15 @@ then exit fi -docker-compose up -d zookeeper kafka cli +docker-compose up -d zookeeper kafka-1 kafka-2 kafka-3 cli if [[ $(docker image ls -q $IMAGE) == "" || "$1" = "build" ]] then - mvn install || exit + docker-compose rm -svf requests + mvn clean install || exit else echo "Using image existing images:" docker image ls $IMAGE @@ -25,7 +26,14 @@ fi echo "Waiting for the Kafka-Cluster to become ready..." docker-compose exec cli cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1 docker-compose up setup -docker-compose up -d producer consumer -sleep 15 -docker-compose stop producer consumer -docker-compose logs consumer +docker-compose up -d + +while ! [[ $(http 0:8080/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for gateway..."; sleep 1; done +while ! [[ $(http 0:8081/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for requests..."; sleep 1; done + + +echo 66 | http -v :8080/foo + +sleep 5 + +kafkacat -b :9092 -t out -o 0 -e -f 'p=%p|o=%o|k=%k|v=%s\n' diff --git a/docker-compose.yml b/docker-compose.yml index f9135d8..3ae4b88 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -7,20 +7,56 @@ services: ports: - 2181:2181 - kafka: + kafka-1: image: confluentinc/cp-kafka:7.1.3 environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_LISTENERS: DOCKER://:9092, LOCALHOST://:9081 + KAFKA_ADVERTISED_LISTENERS: DOCKER://kafka-1:9092, LOCALHOST://localhost:9081 + KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER:PLAINTEXT, LOCALHOST:PLAINTEXT + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 + KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false" + ports: + - 9081:9081 + depends_on: + - zookeeper + + kafka-2: + image: confluentinc/cp-kafka:7.1.3 + environment: + KAFKA_BROKER_ID: 2 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_LISTENERS: DOCKER://:9092, LOCALHOST://:9082 - KAFKA_ADVERTISED_LISTENERS: DOCKER://kafka:9092, LOCALHOST://localhost:9082 + KAFKA_ADVERTISED_LISTENERS: DOCKER://kafka-2:9092, LOCALHOST://localhost:9082 KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER:PLAINTEXT, LOCALHOST:PLAINTEXT - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false" ports: - 9092:9082 - 9082:9082 + networks: + default: + aliases: + - kafka + depends_on: + - zookeeper + + kafka-3: + image: confluentinc/cp-kafka:7.1.3 + environment: + KAFKA_BROKER_ID: 3 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_LISTENERS: DOCKER://:9092, LOCALHOST://:9083 + KAFKA_ADVERTISED_LISTENERS: DOCKER://kafka-3:9092, LOCALHOST://localhost:9083 + KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER:PLAINTEXT, LOCALHOST:PLAINTEXT + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 + KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false" + ports: + - 9083:9083 depends_on: - zookeeper @@ -28,31 +64,33 @@ services: image: juplo/toolbox command: > bash -c " - kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic test - kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 2 + kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic in + kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic out + kafka-topics --bootstrap-server kafka:9092 --create --topic in --partitions 2 --replication-factor 3 --config min.insync.replicas=2 + kafka-topics --bootstrap-server kafka:9092 --create --topic out --partitions 1 --replication-factor 1 + kafka-topics --bootstrap-server kafka:9092 --describe --topic in + kafka-topics --bootstrap-server kafka:9092 --describe --topic out " cli: image: juplo/toolbox command: sleep infinity - producer: - image: juplo/endless-long-producer:1.0-SNAPSHOT + gateway: + image: juplo/sumup-gateway:1.0-SNAPSHOT ports: - 8080:8080 environment: server.port: 8080 - producer.bootstrap-server: kafka:9092 - producer.client-id: producer - producer.topic: test - producer.throttle-ms: 200 - + sumup.gateway.bootstrap-server: kafka:9092 + sumup.gateway.client-id: gateway + sumup.gateway.topic: in - consumer: - image: juplo/endless-consumer:1.0-SNAPSHOT + requests: + image: juplo/sumup-requests:1.0-SNAPSHOT ports: - 8081:8080 environment: server.port: 8080 - consumer.bootstrap-server: kafka:9092 - consumer.client-id: consumer + sumup.requests.bootstrap-server: kafka:9092 + sumup.requests.client-id: requests diff --git a/pom.xml b/pom.xml index fe50bac..0c24f8b 100644 --- a/pom.xml +++ b/pom.xml @@ -12,9 +12,10 @@ de.juplo.kafka - endless-consumer + sumup-requests + SumUp-Requests + A service that reads computation requests from an incomming topic and generates according messages for the SumUp-Consumer on an outgoing topic 1.0-SNAPSHOT - Endless Consumer: a Simple Consumer-Group that reads and print the topic diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index d0334a2..753422e 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -1,8 +1,10 @@ package de.juplo.kafka; import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -17,25 +19,30 @@ import java.util.concurrent.Executors; public class ApplicationConfiguration { @Bean - public RecordHandler noopRecordHandler() + public SumUpRecordHandler sumUpRecordHandler( + KafkaProducer kafkaProducer, + ApplicationProperties properties) { - return record -> {}; + return new SumUpRecordHandler( + kafkaProducer, + properties.getClientId(), + properties.getTopicOut()); } @Bean - public EndlessConsumer endlessConsumer( - KafkaConsumer kafkaConsumer, + public EndlessConsumer endlessConsumer( + KafkaConsumer kafkaConsumer, ExecutorService executor, - RecordHandler noopRecordHandler, + SumUpRecordHandler sumUpRecordHandler, ApplicationProperties properties) { return new EndlessConsumer<>( executor, properties.getClientId(), - properties.getTopic(), + properties.getTopicIn(), kafkaConsumer, - noopRecordHandler); + sumUpRecordHandler); } @Bean @@ -45,7 +52,7 @@ public class ApplicationConfiguration } @Bean(destroyMethod = "close") - public KafkaConsumer kafkaConsumer(ApplicationProperties properties) + public KafkaConsumer kafkaConsumer(ApplicationProperties properties) { Properties props = new Properties(); @@ -57,8 +64,26 @@ public class ApplicationConfiguration props.put("auto.commit.interval.ms", (int)properties.getCommitInterval().toMillis()); props.put("metadata.max.age.ms", "1000"); props.put("key.deserializer", StringDeserializer.class.getName()); - props.put("value.deserializer", LongDeserializer.class.getName()); + props.put("value.deserializer", IntegerDeserializer.class.getName()); return new KafkaConsumer<>(props); } + + @Bean(destroyMethod = "close") + public KafkaProducer kafkaProducer(ApplicationProperties properties) + { + Properties props = new Properties(); + props.put("bootstrap.servers", properties.getBootstrapServer()); + props.put("client.id", properties.getClientId()); + props.put("acks", properties.getAcks()); + props.put("batch.size", properties.getBatchSize()); + props.put("delivery.timeout.ms", 20000); // 20 Sekunden + props.put("request.timeout.ms", 10000); // 10 Sekunden + props.put("linger.ms", properties.getLingerMs()); + props.put("compression.type", properties.getCompressionType()); + props.put("key.serializer", StringSerializer.class.getName()); + props.put("value.serializer", StringSerializer.class.getName()); + + return new KafkaProducer<>(props); + } } diff --git a/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java b/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java index dc3a26e..c4df59c 100644 --- a/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java +++ b/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java @@ -10,7 +10,7 @@ import org.springframework.stereotype.Component; @RequiredArgsConstructor public class ApplicationHealthIndicator implements HealthIndicator { - private final EndlessConsumer consumer; + private final EndlessConsumer consumer; @Override diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java index 14e928f..ccddc81 100644 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java @@ -10,7 +10,7 @@ import javax.validation.constraints.NotNull; import java.time.Duration; -@ConfigurationProperties(prefix = "consumer") +@ConfigurationProperties(prefix = "sumup.requests") @Validated @Getter @Setter @@ -27,10 +27,23 @@ public class ApplicationProperties private String clientId; @NotNull @NotEmpty - private String topic; + private String topicIn; @NotNull @NotEmpty private String autoOffsetReset; @NotNull private Duration commitInterval; + @NotNull + @NotEmpty + private String topicOut; + @NotNull + @NotEmpty + private String acks; + @NotNull + private Integer batchSize; + @NotNull + private Integer lingerMs; + @NotNull + @NotEmpty + private String compressionType; } diff --git a/src/main/java/de/juplo/kafka/SumUpRecordHandler.java b/src/main/java/de/juplo/kafka/SumUpRecordHandler.java new file mode 100644 index 0000000..5d15b3b --- /dev/null +++ b/src/main/java/de/juplo/kafka/SumUpRecordHandler.java @@ -0,0 +1,82 @@ +package de.juplo.kafka; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; + + +@RequiredArgsConstructor +@Slf4j +public class SumUpRecordHandler implements RecordHandler +{ + private final Producer producer; + private final String id; + private final String topic; + + + @Override + public void accept(ConsumerRecord record) + { + String key = record.key(); + int number = record.value(); + + send(key, "START"); + for (int i = 1; i <= number; i++) + { + send(key, Integer.toString(i)); + } + send(key, "END"); + } + + private void send(String key, String value) + { + final long time = System.currentTimeMillis(); + + final ProducerRecord record = new ProducerRecord<>( + topic, // Topic + key, // Key + value // Value + ); + + producer.send(record, (metadata, e) -> + { + long now = System.currentTimeMillis(); + if (e == null) + { + // HANDLE SUCCESS + log.debug( + "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms", + id, + record.key(), + record.value(), + metadata.partition(), + metadata.offset(), + metadata.timestamp(), + now - time + ); + } + else + { + // HANDLE ERROR + log.error( + "{} - ERROR key={} timestamp={} latency={}ms: {}", + id, + record.key(), + metadata == null ? -1 : metadata.timestamp(), + now - time, + e.toString() + ); + } + }); + + long now = System.currentTimeMillis(); + log.trace( + "{} - Queued message with key={} latency={}ms", + id, + record.key(), + now - time + ); + } +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index f8bfe7e..4a5477f 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -1,10 +1,16 @@ -consumer: - bootstrap-server: :9092 - group-id: my-group - client-id: DEV - topic: test - auto-offset-reset: earliest - commit-interval: 5s +sumup: + requests: + bootstrap-server: :9092 + group-id: my-group + client-id: DEV + topic-in: in + topic-out: out + auto-offset-reset: earliest + commit-interval: 5s + acks: -1 + batch-size: 16384 + linger-ms: 0 + compression-type: gzip management: endpoint: shutdown: diff --git a/src/test/java/de/juplo/kafka/ApplicationIT.java b/src/test/java/de/juplo/kafka/ApplicationIT.java index ea1e43c..d1c8371 100644 --- a/src/test/java/de/juplo/kafka/ApplicationIT.java +++ b/src/test/java/de/juplo/kafka/ApplicationIT.java @@ -7,16 +7,16 @@ import org.springframework.boot.test.web.client.TestRestTemplate; import org.springframework.boot.test.web.server.LocalServerPort; import org.springframework.kafka.test.context.EmbeddedKafka; -import static de.juplo.kafka.ApplicationTests.TOPIC; +import static de.juplo.kafka.ApplicationTests.INPUT_TOPIC; @SpringBootTest( webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, properties = { - "consumer.bootstrap-server=${spring.embedded.kafka.brokers}", - "consumer.topic=" + TOPIC, + "sumup.requests.bootstrap-server=${spring.embedded.kafka.brokers}", + "sumup.requests.topic=" + INPUT_TOPIC, "spring.mongodb.embedded.version=4.4.13" }) -@EmbeddedKafka(topics = TOPIC) +@EmbeddedKafka(topics = INPUT_TOPIC) public class ApplicationIT { public static final String TOPIC = "FOO"; diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 50627da..491335a 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -28,8 +28,7 @@ import java.util.function.BiConsumer; import java.util.stream.Collectors; import java.util.stream.IntStream; -import static de.juplo.kafka.ApplicationTests.PARTITIONS; -import static de.juplo.kafka.ApplicationTests.TOPIC; +import static de.juplo.kafka.ApplicationTests.*; import static org.assertj.core.api.Assertions.*; import static org.awaitility.Awaitility.*; @@ -38,15 +37,16 @@ import static org.awaitility.Awaitility.*; @TestMethodOrder(MethodOrderer.OrderAnnotation.class) @TestPropertySource( properties = { - "consumer.bootstrap-server=${spring.embedded.kafka.brokers}", - "consumer.topic=" + TOPIC, - "consumer.commit-interval=1s" }) -@EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS) + "sumup.requests.bootstrap-server=${spring.embedded.kafka.brokers}", + "sumup.requests.topic-in=" + INPUT_TOPIC, + "sumup.requests.commit-interval=1s" }) +@EmbeddedKafka(topics = { INPUT_TOPIC, OUTPUT_TOPIC }, partitions = PARTITIONS) @EnableAutoConfiguration @Slf4j class ApplicationTests { - public static final String TOPIC = "FOO"; + public static final String INPUT_TOPIC = "FOO"; + public static final String OUTPUT_TOPIC = "BAR"; public static final int PARTITIONS = 10; @@ -55,9 +55,9 @@ class ApplicationTests @Autowired Serializer valueSerializer; @Autowired - KafkaProducer kafkaProducer; + KafkaProducer testProducer; @Autowired - KafkaConsumer kafkaConsumer; + KafkaConsumer kafkaConsumer; @Autowired KafkaConsumer offsetConsumer; @Autowired @@ -67,10 +67,10 @@ class ApplicationTests @Autowired RecordHandler noopRecordHandler; - EndlessConsumer endlessConsumer; + EndlessConsumer endlessConsumer; Map oldOffsets; Map newOffsets; - Set> receivedRecords; + Set> receivedRecords; /** Tests methods */ @@ -80,8 +80,8 @@ class ApplicationTests { send100Messages((partition, key, counter) -> { - Bytes value = new Bytes(valueSerializer.serialize(TOPIC, counter)); - return new ProducerRecord<>(TOPIC, partition, key, value); + Bytes value = new Bytes(valueSerializer.serialize(INPUT_TOPIC, counter)); + return new ProducerRecord<>(INPUT_TOPIC, partition, key, value); }); await("100 records received") @@ -109,9 +109,9 @@ class ApplicationTests send100Messages((partition, key, counter) -> { Bytes value = counter == 77 - ? new Bytes(stringSerializer.serialize(TOPIC, "BOOM!")) - : new Bytes(valueSerializer.serialize(TOPIC, counter)); - return new ProducerRecord<>(TOPIC, partition, key, value); + ? new Bytes(stringSerializer.serialize(INPUT_TOPIC, "BOOM!")) + : new Bytes(valueSerializer.serialize(INPUT_TOPIC, counter)); + return new ProducerRecord<>(INPUT_TOPIC, partition, key, value); }); await("Consumer failed") @@ -206,19 +206,19 @@ class ApplicationTests return IntStream .range(0, PARTITIONS) - .mapToObj(partition -> new TopicPartition(TOPIC, partition)) + .mapToObj(partition -> new TopicPartition(INPUT_TOPIC, partition)) .collect(Collectors.toList()); } public interface RecordGenerator { - public ProducerRecord generate(int partition, String key, long counter); + public ProducerRecord generate(int partition, String key, int counter); } void send100Messages(RecordGenerator recordGenerator) { - long i = 0; + int i = 0; for (int partition = 0; partition < 10; partition++) { @@ -227,7 +227,7 @@ class ApplicationTests ProducerRecord record = recordGenerator.generate(partition, Integer.toString(partition*10+key%2), ++i); - kafkaProducer.send(record, (metadata, e) -> + testProducer.send(record, (metadata, e) -> { if (metadata != null) { @@ -267,10 +267,10 @@ class ApplicationTests newOffsets.put(tp, offset - 1); }); - TestRecordHandler captureOffsetAndExecuteTestHandler = - new TestRecordHandler(noopRecordHandler) { + TestRecordHandler captureOffsetAndExecuteTestHandler = + new TestRecordHandler(noopRecordHandler) { @Override - public void onNewRecord(ConsumerRecord record) + public void onNewRecord(ConsumerRecord record) { newOffsets.put( new TopicPartition(record.topic(), record.partition()), @@ -283,7 +283,7 @@ class ApplicationTests new EndlessConsumer<>( executor, properties.getClientId(), - properties.getTopic(), + properties.getTopicIn(), kafkaConsumer, captureOffsetAndExecuteTestHandler); @@ -309,13 +309,13 @@ class ApplicationTests public static class Configuration { @Bean - Serializer serializer() + Serializer valueSerializer() { - return new LongSerializer(); + return new IntegerSerializer(); } @Bean - KafkaProducer kafkaProducer(ApplicationProperties properties) + KafkaProducer testProducer(ApplicationProperties properties) { Properties props = new Properties(); props.put("bootstrap.servers", properties.getBootstrapServer()); @@ -338,5 +338,16 @@ class ApplicationTests return new KafkaConsumer<>(props); } + + @Bean + KafkaProducer kafkaProducer(ApplicationProperties properties) + { + Properties props = new Properties(); + props.put("bootstrap.servers", properties.getBootstrapServer()); + props.put("key.serializer", StringSerializer.class.getName()); + props.put("value.serializer", StringSerializer.class.getName()); + + return new KafkaProducer<>(props); + } } }