streams - Übungen - Microservices - Schritt 02
[demos/microservices] / details / src / main / java / de / trion / microservices / details / DetailsService.java
1 package de.trion.microservices.details;
2
3
4 import de.trion.microservices.avro.Order;
5 import static org.springframework.http.MediaType.APPLICATION_JSON;
6 import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
7 import java.net.URI;
8 import java.util.Properties;
9 import javax.annotation.PostConstruct;
10 import javax.annotation.PreDestroy;
11 import org.apache.kafka.common.serialization.Serdes;
12 import org.apache.kafka.streams.KafkaStreams;
13 import org.apache.kafka.streams.StreamsBuilder;
14 import org.apache.kafka.streams.Topology;
15 import org.apache.kafka.streams.kstream.Materialized;
16 import org.apache.kafka.streams.state.QueryableStoreTypes;
17 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
18 import org.apache.kafka.streams.state.StreamsMetadata;
19 import org.slf4j.Logger;
20 import org.slf4j.LoggerFactory;
21 import org.springframework.http.HttpStatus;
22 import org.springframework.http.MediaType;
23 import org.springframework.http.ResponseEntity;
24 import org.springframework.web.bind.annotation.GetMapping;
25 import org.springframework.web.bind.annotation.PathVariable;
26 import org.springframework.web.bind.annotation.RestController;
27
28
29 @RestController
30 public class DetailsService
31 {
32   final static Logger LOG = LoggerFactory.getLogger(DetailsService.class);
33
34   private final String topic;
35   private final String host;
36   private final int port;
37   private final KafkaStreams streams;
38
39   private ReadOnlyKeyValueStore<String, Order> orders;
40
41
42   public DetailsService(ApplicationProperties config)
43   {
44     topic = config.topic;
45
46     String[] splitted = config.applicationServer.split(":");
47     host = splitted[0];
48     port = Integer.parseInt(splitted[1]);
49
50     Properties properties = new Properties();
51     properties.put("bootstrap.servers", config.bootstrapServers);
52     properties.put("application.id", "details");
53     properties.put("application.server", config.applicationServer);
54     properties.put("schema.registry.url", config.schemaRegistryUrl);
55     properties.put("default.key.serde", Serdes.String().getClass());
56     properties.put("default.value.serde", SpecificAvroSerde.class);
57
58     StreamsBuilder builder = new StreamsBuilder();
59     builder.table(topic, Materialized.as(topic));
60
61     Topology topology = builder.build();
62     streams = new KafkaStreams(topology, properties);
63     streams.setUncaughtExceptionHandler((Thread t, Throwable e) ->
64     {
65       LOG.error("Unexpected error in thread {}: {}", t, e.toString());
66       try
67       {
68         streams.close();
69       }
70       catch (Exception ex)
71       {
72         LOG.error("Could not close KafkaStreams!", ex);
73       }
74     });
75     streams.setStateListener((newState, oldState) ->
76     {
77       if (newState == KafkaStreams.State.RUNNING)
78         orders = streams.store(topic, QueryableStoreTypes.keyValueStore());
79     });
80   }
81
82
83   @GetMapping(
84       path = "/orders/{id}",
85       produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
86   public ResponseEntity<?> getOrder(@PathVariable String id)
87   {
88     StreamsMetadata metadata = streams.metadataForKey(topic, id, Serdes.String().serializer());
89     LOG.debug("Local store for {}: {}:{}", id, metadata.host(), metadata.port());
90
91     if (port != metadata.port() || !host.equals(metadata.host()))
92     {
93       URI location = URI.create("http://" + metadata.host() + ":" + metadata.port() + "/" + id);
94       LOG.debug("Redirecting to {}", location);
95       return
96           ResponseEntity
97               .status(HttpStatus.TEMPORARY_REDIRECT)
98               .location(location)
99               .build();
100     }
101
102     Order order = orders.get(id);
103     return order == null
104         ? ResponseEntity.notFound().build()
105         : ResponseEntity.ok().contentType(APPLICATION_JSON).body(order.toString());
106   }
107
108
109   @PostConstruct
110   public void start()
111   {
112     streams.start();
113   }
114
115   @PreDestroy
116   public void stop()
117   {
118     streams.close();
119   }
120 }