streams - Übungen - Microservices - Schritt 02
[demos/microservices] / details / src / main / java / de / trion / microservices / details / DetailsService.java
diff --git a/details/src/main/java/de/trion/microservices/details/DetailsService.java b/details/src/main/java/de/trion/microservices/details/DetailsService.java
new file mode 100644 (file)
index 0000000..a1f8dbd
--- /dev/null
@@ -0,0 +1,115 @@
+package de.trion.microservices.details;
+
+
+import de.trion.microservices.avro.Order;
+import static org.springframework.http.MediaType.APPLICATION_JSON;
+import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
+import java.net.URI;
+import java.util.Properties;
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.StreamsMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.MediaType;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.RestController;
+
+
+@RestController
+public class DetailsService
+{
+  final static Logger LOG = LoggerFactory.getLogger(DetailsService.class);
+
+  private final String topic;
+  private final String host;
+  private final int port;
+  private final KafkaStreams streams;
+
+
+  public DetailsService(ApplicationProperties config)
+  {
+    topic = config.topic;
+
+    String[] splitted = config.applicationServer.split(":");
+    host = splitted[0];
+    port = Integer.parseInt(splitted[1]);
+
+    Properties properties = new Properties();
+    properties.put("bootstrap.servers", config.bootstrapServers);
+    properties.put("application.id", "details");
+    properties.put("application.server", config.applicationServer);
+    properties.put("schema.registry.url", config.schemaRegistryUrl);
+    properties.put("default.key.serde", Serdes.String().getClass());
+    properties.put("default.value.serde", SpecificAvroSerde.class);
+
+    StreamsBuilder builder = new StreamsBuilder();
+    builder.table(topic, Materialized.as(topic));
+
+    Topology topology = builder.build();
+    streams = new KafkaStreams(topology, properties);
+    streams.setUncaughtExceptionHandler((Thread t, Throwable e) ->
+    {
+      LOG.error("Unexpected error in thread {}: {}", t, e.toString());
+      try
+      {
+        streams.close();
+      }
+      catch (Exception ex)
+      {
+        LOG.error("Could not close KafkaStreams!", ex);
+      }
+    });
+  }
+
+
+  @GetMapping(
+      path = "/orders/{id}",
+      produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
+  public ResponseEntity<?> getOrder(@PathVariable String id)
+  {
+    StreamsMetadata metadata = streams.metadataForKey(topic, id, Serdes.String().serializer());
+    LOG.debug("Local store for {}: {}:{}", id, metadata.host(), metadata.port());
+
+    if (port != metadata.port() || !host.equals(metadata.host()))
+    {
+      URI location = URI.create("http://" + metadata.host() + ":" + metadata.port() + "/" + id);
+      LOG.debug("Redirecting to {}", location);
+      return
+          ResponseEntity
+              .status(HttpStatus.TEMPORARY_REDIRECT)
+              .location(location)
+              .build();
+    }
+
+    ReadOnlyKeyValueStore<String, Order> orders;
+    orders = streams.store(topic, QueryableStoreTypes.keyValueStore());
+    Order order = orders.get(id);
+    return order == null
+        ? ResponseEntity.notFound().build()
+        : ResponseEntity.ok().contentType(APPLICATION_JSON).body(order.toString());
+  }
+
+
+  @PostConstruct
+  public void start()
+  {
+    streams.start();
+  }
+
+  @PreDestroy
+  public void stop()
+  {
+    streams.close();
+  }
+}