users: 1.0.1 - added an endpoint to receive details fo a user
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / users / UsersStreamProcessor.java
diff --git a/src/main/java/de/juplo/kafka/wordcount/users/UsersStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/users/UsersStreamProcessor.java
new file mode 100644 (file)
index 0000000..ef39e89
--- /dev/null
@@ -0,0 +1,118 @@
+package de.juplo.kafka.wordcount.users;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.*;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.HostInfo;
+import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.springframework.boot.SpringApplication;
+import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.net.URI;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
+
+
+@Slf4j
+@Component
+public class UsersStreamProcessor
+{
+       public final KafkaStreams streams;
+       public final HostInfo hostInfo;
+       public final String storeName = "rankingsByUsername";
+       public final StoreQueryParameters<ReadOnlyKeyValueStore<String, String>> storeParameters;
+       public final ObjectMapper mapper;
+
+
+       public UsersStreamProcessor(
+                       UsersApplicationProperties properties,
+                       ObjectMapper mapper,
+                       ConfigurableApplicationContext context)
+       {
+               StreamsBuilder builder = new StreamsBuilder();
+               builder.table(properties.getTopic(), Materialized.as(storeName));
+
+               Properties props = new Properties();
+               props.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId());
+               props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, properties.getApplicationServer());
+               props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
+               props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+               props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+               props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+               streams = new KafkaStreams(builder.build(), props);
+               streams.setUncaughtExceptionHandler((Throwable e) ->
+               {
+                       log.error("Unexpected error!", e);
+                       CompletableFuture.runAsync(() ->
+                       {
+                               log.info("Stopping application...");
+                               SpringApplication.exit(context, () -> 1);
+                       });
+                       return SHUTDOWN_CLIENT;
+               });
+
+               hostInfo = HostInfo.buildFromEndpoint(properties.getApplicationServer());
+               storeParameters = StoreQueryParameters.fromNameAndType(storeName, QueryableStoreTypes.keyValueStore());;
+               this.mapper = mapper;
+       }
+
+       public Optional<URI> getRedirect(String username)
+       {
+               KeyQueryMetadata metadata = streams.queryMetadataForKey(storeName, username, Serdes.String().serializer());
+               HostInfo activeHost = metadata.activeHost();
+               log.debug("Local store for {}: {}, {}:{}", username, metadata.partition(), activeHost.host(), activeHost.port());
+
+               if (activeHost.equals(this.hostInfo) || activeHost.equals(HostInfo.unavailable()))
+               {
+                       return Optional.empty();
+               }
+
+               URI location = URI.create("http://" + activeHost.host() + ":" + activeHost.port() + "/" + username);
+               log.debug("Redirecting to {}", location);
+               return Optional.of(location);
+       }
+
+       public Optional<User> getUser(String username)
+       {
+               return
+                               Optional
+                                               .ofNullable(streams.store(storeParameters).get(username))
+                                               .map(json ->
+                                               {
+                                                       try
+                                                       {
+                                                               return mapper.readValue(json, User.class);
+                                                       }
+                                                       catch (JsonProcessingException e)
+                                                       {
+                                                               throw new RuntimeException(e);
+                                                       }
+                                               });
+       }
+
+       @PostConstruct
+       public void start()
+       {
+               log.info("Starting Stream-Processor");
+               streams.start();
+       }
+
+       @PreDestroy
+       public void stop()
+       {
+               log.info("Stopping Stream-Processor");
+               streams.close();
+       }
+}