efa28ca69fd18b5f7248b2c4f1a09b7c3304a455
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / query / QueryStreamProcessor.java
1 package de.juplo.kafka.wordcount.query;
2
3 import com.fasterxml.jackson.core.JsonProcessingException;
4 import com.fasterxml.jackson.databind.ObjectMapper;
5 import jakarta.annotation.PostConstruct;
6 import jakarta.annotation.PreDestroy;
7 import lombok.extern.slf4j.Slf4j;
8 import org.apache.kafka.common.serialization.Serdes;
9 import org.apache.kafka.streams.*;
10 import org.apache.kafka.streams.kstream.KStream;
11 import org.apache.kafka.streams.kstream.KTable;
12 import org.apache.kafka.streams.kstream.Materialized;
13 import org.apache.kafka.streams.state.HostInfo;
14 import org.apache.kafka.streams.state.QueryableStoreTypes;
15 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
16
17 import java.net.URI;
18 import java.util.Optional;
19 import java.util.Properties;
20
21
22 @Slf4j
23 public class QueryStreamProcessor
24 {
25         public static final String STORE_NAME = "rankings-by-username";
26
27         public final KafkaStreams streams;
28         public final HostInfo hostInfo;
29         public final StoreQueryParameters<ReadOnlyKeyValueStore<String, String>> storeParameters;
30         public final ObjectMapper mapper;
31
32
33         public QueryStreamProcessor(
34                         Properties props,
35                         HostInfo applicationServer,
36                         String usersInputTopic,
37                         String rankingInputTopic,
38                         ObjectMapper mapper)
39         {
40                 Topology topology = buildTopology(usersInputTopic, rankingInputTopic, mapper);
41                 streams = new KafkaStreams(topology, props);
42                 hostInfo = applicationServer;
43                 storeParameters = StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore());;
44                 this.mapper = mapper;
45         }
46
47         static Topology buildTopology(
48                         String usersInputTopic,
49                         String rankingInputTopic,
50                         ObjectMapper mapper)
51         {
52                 StreamsBuilder builder = new StreamsBuilder();
53
54                 KTable<String, String> users = builder.table(usersInputTopic);
55                 KStream<String, String> rankings = builder.stream(rankingInputTopic);
56
57                 rankings
58                                 .join(users, (rankingJson, userJson) ->
59                                 {
60                                         try
61                                         {
62                                                 Ranking ranking = mapper.readValue(rankingJson, Ranking.class);
63                                                 User user = mapper.readValue(userJson, User.class);
64
65                                                 return mapper.writeValueAsString(
66                                                                 UserRanking.of(
67                                                                                 user.getFirstName(),
68                                                                                 user.getLastName(),
69                                                                                 ranking.getEntries()));
70                                         }
71                                         catch (JsonProcessingException e)
72                                         {
73                                                 throw new RuntimeException(e);
74                                         }
75                                 })
76                                 .toTable(Materialized.as(STORE_NAME));
77
78                 Topology topology = builder.build();
79                 log.info("\n\n{}", topology.describe());
80
81                 return topology;
82         }
83
84         public Optional<URI> getRedirect(String username)
85         {
86                 KeyQueryMetadata metadata = streams.queryMetadataForKey(STORE_NAME, username, Serdes.String().serializer());
87                 HostInfo activeHost = metadata.activeHost();
88                 log.debug("Local store for {}: {}, {}:{}", username, metadata.partition(), activeHost.host(), activeHost.port());
89
90                 if (activeHost.equals(this.hostInfo) || activeHost.equals(HostInfo.unavailable()))
91                 {
92                         return Optional.empty();
93                 }
94
95                 URI location = URI.create("http://" + activeHost.host() + ":" + activeHost.port() + "/" + username);
96                 log.debug("Redirecting to {}", location);
97                 return Optional.of(location);
98         }
99
100         public Optional<UserRanking> getUserRanking(String username)
101         {
102                 return
103                                 Optional
104                                                 .ofNullable(streams.store(storeParameters).get(username))
105                                                 .map(json ->
106                                                 {
107                                                         try
108                                                         {
109                                                                 return mapper.readValue(json, UserRanking.class);
110                                                         }
111                                                         catch (JsonProcessingException e)
112                                                         {
113                                                                 throw new RuntimeException(e);
114                                                         }
115                                                 });
116         }
117
118         @PostConstruct
119         public void start()
120         {
121                 log.info("Starting Stream-Processor");
122                 streams.start();
123         }
124
125         @PreDestroy
126         public void stop()
127         {
128                 log.info("Stopping Stream-Processor");
129                 streams.close();
130         }
131 }