X-Git-Url: https://juplo.de/gitweb/?p=demos%2Fmicroservices;a=blobdiff_plain;f=details%2Fsrc%2Fmain%2Fjava%2Fde%2Ftrion%2Fmicroservices%2Fdetails%2FDetailsService.java;fp=details%2Fsrc%2Fmain%2Fjava%2Fde%2Ftrion%2Fmicroservices%2Fdetails%2FDetailsService.java;h=a1f8dbdd4388a8992d156a54c77a7fc6176339f9;hp=0000000000000000000000000000000000000000;hb=fa556c2fa0c9bb3574faed66d943e13350b4d860;hpb=10e9198de0f46c1ed7d9c574288eb7deb368ee68 diff --git a/details/src/main/java/de/trion/microservices/details/DetailsService.java b/details/src/main/java/de/trion/microservices/details/DetailsService.java new file mode 100644 index 0000000..a1f8dbd --- /dev/null +++ b/details/src/main/java/de/trion/microservices/details/DetailsService.java @@ -0,0 +1,115 @@ +package de.trion.microservices.details; + + +import de.trion.microservices.avro.Order; +import static org.springframework.http.MediaType.APPLICATION_JSON; +import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde; +import java.net.URI; +import java.util.Properties; +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.state.QueryableStoreTypes; +import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; +import org.apache.kafka.streams.state.StreamsMetadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RestController; + + +@RestController +public class DetailsService +{ + final static Logger LOG = LoggerFactory.getLogger(DetailsService.class); + + private final String topic; + private final String host; + private final int port; + private final KafkaStreams streams; + + + public DetailsService(ApplicationProperties config) + { + topic = config.topic; + + String[] splitted = config.applicationServer.split(":"); + host = splitted[0]; + port = Integer.parseInt(splitted[1]); + + Properties properties = new Properties(); + properties.put("bootstrap.servers", config.bootstrapServers); + properties.put("application.id", "details"); + properties.put("application.server", config.applicationServer); + properties.put("schema.registry.url", config.schemaRegistryUrl); + properties.put("default.key.serde", Serdes.String().getClass()); + properties.put("default.value.serde", SpecificAvroSerde.class); + + StreamsBuilder builder = new StreamsBuilder(); + builder.table(topic, Materialized.as(topic)); + + Topology topology = builder.build(); + streams = new KafkaStreams(topology, properties); + streams.setUncaughtExceptionHandler((Thread t, Throwable e) -> + { + LOG.error("Unexpected error in thread {}: {}", t, e.toString()); + try + { + streams.close(); + } + catch (Exception ex) + { + LOG.error("Could not close KafkaStreams!", ex); + } + }); + } + + + @GetMapping( + path = "/orders/{id}", + produces = MediaType.APPLICATION_JSON_UTF8_VALUE) + public ResponseEntity getOrder(@PathVariable String id) + { + StreamsMetadata metadata = streams.metadataForKey(topic, id, Serdes.String().serializer()); + LOG.debug("Local store for {}: {}:{}", id, metadata.host(), metadata.port()); + + if (port != metadata.port() || !host.equals(metadata.host())) + { + URI location = URI.create("http://" + metadata.host() + ":" + metadata.port() + "/" + id); + LOG.debug("Redirecting to {}", location); + return + ResponseEntity + .status(HttpStatus.TEMPORARY_REDIRECT) + .location(location) + .build(); + } + + ReadOnlyKeyValueStore orders; + orders = streams.store(topic, QueryableStoreTypes.keyValueStore()); + Order order = orders.get(id); + return order == null + ? ResponseEntity.notFound().build() + : ResponseEntity.ok().contentType(APPLICATION_JSON).body(order.toString()); + } + + + @PostConstruct + public void start() + { + streams.start(); + } + + @PreDestroy + public void stop() + { + streams.close(); + } +}