From fa556c2fa0c9bb3574faed66d943e13350b4d860 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 7 Jun 2020 11:56:12 +0200 Subject: [PATCH] =?utf8?q?streams=20-=20=C3=9Cbungen=20-=20Microservices?= =?utf8?q?=20-=20Schritt=2002=20--=20Details-Service=20f=C3=BCr=20Order-Au?= =?utf8?q?ftr=C3=A4ge=20implementiert?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Der Service lauscht auf dem "orders"-Topic und baut den Zustand auf * Über eine REST-Anfrage kann der aktuelle Zustand erfragt werden --- .gitignore | 2 + README.sh | 4 +- details/Dockerfile | 5 + details/order.avsc | 21 ++++ details/pom.xml | 80 ++++++++++++ .../microservices/details/Application.java | 22 ++++ .../details/ApplicationProperties.java | 55 +++++++++ .../microservices/details/DetailsService.java | 115 ++++++++++++++++++ .../src/main/resources/application.properties | 1 + docker-compose.yml | 10 ++ pom.xml | 3 +- take-order/pom.xml | 2 +- 12 files changed, 317 insertions(+), 3 deletions(-) create mode 100644 details/Dockerfile create mode 100644 details/order.avsc create mode 100644 details/pom.xml create mode 100644 details/src/main/java/de/trion/microservices/details/Application.java create mode 100644 details/src/main/java/de/trion/microservices/details/ApplicationProperties.java create mode 100644 details/src/main/java/de/trion/microservices/details/DetailsService.java create mode 100644 details/src/main/resources/application.properties diff --git a/.gitignore b/.gitignore index 4d57b56..df2dd73 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,4 @@ take-order/src/main/java/de/trion/microservices/avro take-order/target +details/src/main/java/de/trion/microservices/avro +details/target diff --git a/README.sh b/README.sh index 9ee841e..b357ba3 100755 --- a/README.sh +++ b/README.sh @@ -12,6 +12,7 @@ trap 'kill $(jobs -p)' EXIT mvn package docker build -t trion/take-order-service:01 take-order +docker build -t trion/details-service:02 details docker-compose up -d zookeeper kafka schema-registry @@ -19,10 +20,11 @@ while ! [[ $(zookeeper-shell zookeeper:2181 ls /brokers/ids 2> /dev/null) =~ 100 kafka-topics --zookeeper zookeeper:2181 --if-not-exists --create --replication-factor 1 --partitions 5 --topic orders -docker-compose up -d take-order +docker-compose up -d take-order details kafka-avro-console-consumer --bootstrap-server kafka:9092 --topic orders & while ! [[ $(http 0:8091/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for take-order..."; sleep 1; done +while ! [[ $(http 0:8092/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for details..."; sleep 1; done http -v post 0:8091/orders Accept:*/* id=1 customerId=2 productId=234 quantity=5 http -v post 0:8091/orders Accept:*/* id=1 customerId=2 productId=234 quantity= http -v post 0:8091/orders Accept:*/* id=1 customerId=2 productId=234 quantity=-5 diff --git a/details/Dockerfile b/details/Dockerfile new file mode 100644 index 0000000..11a8724 --- /dev/null +++ b/details/Dockerfile @@ -0,0 +1,5 @@ +FROM openjdk:8-jre-slim +COPY target/details-02-SNAPSHOT.jar /opt/ +EXPOSE 8080 +ENTRYPOINT [ "java", "-jar", "/opt/details-02-SNAPSHOT.jar" ] +CMD [] diff --git a/details/order.avsc b/details/order.avsc new file mode 100644 index 0000000..c857b50 --- /dev/null +++ b/details/order.avsc @@ -0,0 +1,21 @@ +[ + { + "namespace": "de.trion.microservices.avro", + "type": "enum", + "name": "OrderState", + "symbols" : [ "CREATED" ] + }, + { + "namespace": "de.trion.microservices.avro", + "type": "record", + "name": "Order", + "fields": [ + { "name": "id", "type": "string" }, + { "name": "state", "type": "OrderState" }, + { "name": "customerId", "type": "long" }, + { "name": "orderId", "type": "long" }, + { "name": "productId", "type": "long" }, + { "name": "quantity", "type": "int" } + ] + } +] diff --git a/details/pom.xml b/details/pom.xml new file mode 100644 index 0000000..b5a6a74 --- /dev/null +++ b/details/pom.xml @@ -0,0 +1,80 @@ + + + 4.0.0 + + + de.trion.kafka.microservices + order-example + 02 + + + de.trion.kafka.microservices + details + Order Details Service + 02-SNAPSHOT + + + 1.9.0 + 5.3.0 + 2.3.0 + + + + + org.springframework.boot + spring-boot-starter-actuator + + + org.springframework.boot + spring-boot-starter-web + + + org.apache.kafka + kafka-streams + + + org.apache.avro + avro + ${avro.version} + + + io.confluent + kafka-streams-avro-serde + ${confluent.version} + + + + + + confluent + https://packages.confluent.io/maven/ + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + org.apache.avro + avro-maven-plugin + ${avro.version} + + + generate-sources + + schema + + + ${project.basedir}/ + ${project.basedir}/src/main/java/ + + + + + + + + diff --git a/details/src/main/java/de/trion/microservices/details/Application.java b/details/src/main/java/de/trion/microservices/details/Application.java new file mode 100644 index 0000000..f708485 --- /dev/null +++ b/details/src/main/java/de/trion/microservices/details/Application.java @@ -0,0 +1,22 @@ +package de.trion.microservices.details; + + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.context.properties.EnableConfigurationProperties; + + +@SpringBootApplication +@EnableConfigurationProperties(ApplicationProperties.class) +public class Application +{ + private final static Logger LOG = LoggerFactory.getLogger(Application.class); + + + public static void main(String[] args) + { + SpringApplication.run(Application.class, args); + } +} \ No newline at end of file diff --git a/details/src/main/java/de/trion/microservices/details/ApplicationProperties.java b/details/src/main/java/de/trion/microservices/details/ApplicationProperties.java new file mode 100644 index 0000000..306f161 --- /dev/null +++ b/details/src/main/java/de/trion/microservices/details/ApplicationProperties.java @@ -0,0 +1,55 @@ +package de.trion.microservices.details; + + +import org.springframework.boot.context.properties.ConfigurationProperties; + + +@ConfigurationProperties("details") +public class ApplicationProperties +{ + String bootstrapServers = "kafka:9092"; + String schemaRegistryUrl = "http://schema-registry:8081"; + String topic = "orders"; + String applicationServer = "details:8092"; + + + public String getBootstrapServers() + { + return bootstrapServers; + } + + public void setBootstrapServers(String bootstrapServers) + { + this.bootstrapServers = bootstrapServers; + } + + public String getSchemaRegistryUrl() + { + return schemaRegistryUrl; + } + + public void setSchemaRegistryUrl(String schemaRegistryUrl) + { + this.schemaRegistryUrl = schemaRegistryUrl; + } + + public String getTopic() + { + return topic; + } + + public void setTopic(String topic) + { + this.topic = topic; + } + + public String getApplicationServer() + { + return applicationServer; + } + + public void setApplicationServer(String applicationServer) + { + this.applicationServer = applicationServer; + } +} diff --git a/details/src/main/java/de/trion/microservices/details/DetailsService.java b/details/src/main/java/de/trion/microservices/details/DetailsService.java new file mode 100644 index 0000000..a1f8dbd --- /dev/null +++ b/details/src/main/java/de/trion/microservices/details/DetailsService.java @@ -0,0 +1,115 @@ +package de.trion.microservices.details; + + +import de.trion.microservices.avro.Order; +import static org.springframework.http.MediaType.APPLICATION_JSON; +import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde; +import java.net.URI; +import java.util.Properties; +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.state.QueryableStoreTypes; +import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; +import org.apache.kafka.streams.state.StreamsMetadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RestController; + + +@RestController +public class DetailsService +{ + final static Logger LOG = LoggerFactory.getLogger(DetailsService.class); + + private final String topic; + private final String host; + private final int port; + private final KafkaStreams streams; + + + public DetailsService(ApplicationProperties config) + { + topic = config.topic; + + String[] splitted = config.applicationServer.split(":"); + host = splitted[0]; + port = Integer.parseInt(splitted[1]); + + Properties properties = new Properties(); + properties.put("bootstrap.servers", config.bootstrapServers); + properties.put("application.id", "details"); + properties.put("application.server", config.applicationServer); + properties.put("schema.registry.url", config.schemaRegistryUrl); + properties.put("default.key.serde", Serdes.String().getClass()); + properties.put("default.value.serde", SpecificAvroSerde.class); + + StreamsBuilder builder = new StreamsBuilder(); + builder.table(topic, Materialized.as(topic)); + + Topology topology = builder.build(); + streams = new KafkaStreams(topology, properties); + streams.setUncaughtExceptionHandler((Thread t, Throwable e) -> + { + LOG.error("Unexpected error in thread {}: {}", t, e.toString()); + try + { + streams.close(); + } + catch (Exception ex) + { + LOG.error("Could not close KafkaStreams!", ex); + } + }); + } + + + @GetMapping( + path = "/orders/{id}", + produces = MediaType.APPLICATION_JSON_UTF8_VALUE) + public ResponseEntity getOrder(@PathVariable String id) + { + StreamsMetadata metadata = streams.metadataForKey(topic, id, Serdes.String().serializer()); + LOG.debug("Local store for {}: {}:{}", id, metadata.host(), metadata.port()); + + if (port != metadata.port() || !host.equals(metadata.host())) + { + URI location = URI.create("http://" + metadata.host() + ":" + metadata.port() + "/" + id); + LOG.debug("Redirecting to {}", location); + return + ResponseEntity + .status(HttpStatus.TEMPORARY_REDIRECT) + .location(location) + .build(); + } + + ReadOnlyKeyValueStore orders; + orders = streams.store(topic, QueryableStoreTypes.keyValueStore()); + Order order = orders.get(id); + return order == null + ? ResponseEntity.notFound().build() + : ResponseEntity.ok().contentType(APPLICATION_JSON).body(order.toString()); + } + + + @PostConstruct + public void start() + { + streams.start(); + } + + @PreDestroy + public void stop() + { + streams.close(); + } +} diff --git a/details/src/main/resources/application.properties b/details/src/main/resources/application.properties new file mode 100644 index 0000000..ce81378 --- /dev/null +++ b/details/src/main/resources/application.properties @@ -0,0 +1 @@ +logging.level.de.trion=debug diff --git a/docker-compose.yml b/docker-compose.yml index 0e632a2..7714d3a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -42,6 +42,16 @@ services: - kafka - schema-registry + details: + image: trion/details-service:02 + hostname: details + ports: + - "8092:8080" + depends_on: + - zookeeper + - kafka + - schema-registry + networks: default: external: diff --git a/pom.xml b/pom.xml index 478a9cc..88ebc8b 100644 --- a/pom.xml +++ b/pom.xml @@ -11,7 +11,7 @@ de.trion.kafka.microservices order-example Order Example - 01 + 02 pom @@ -22,6 +22,7 @@ take-order + details diff --git a/take-order/pom.xml b/take-order/pom.xml index 067a8da..f5ad727 100644 --- a/take-order/pom.xml +++ b/take-order/pom.xml @@ -5,7 +5,7 @@ de.trion.kafka.microservices order-example - 01 + 02 de.trion.kafka.microservices -- 2.20.1