users: 1.0.1 - added an endpoint to receive details fo a user
authorKai Moritz <kai@juplo.de>
Thu, 14 Oct 2021 20:11:01 +0000 (22:11 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 16 Oct 2021 13:44:11 +0000 (15:44 +0200)
pom.xml
src/main/java/de/juplo/kafka/wordcount/users/UsersApplicationProperties.java
src/main/java/de/juplo/kafka/wordcount/users/UsersController.java
src/main/java/de/juplo/kafka/wordcount/users/UsersStreamProcessor.java [new file with mode: 0644]

diff --git a/pom.xml b/pom.xml
index 5790169..ef0f148 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -10,7 +10,7 @@
        </parent>
        <groupId>de.juplo.kafka.wordcount</groupId>
        <artifactId>users</artifactId>
-       <version>1.0.0</version>
+       <version>1.0.1</version>
        <name>Wordcount-Users</name>
        <description>Users-service of the multi-user wordcount-example</description>
        <properties>
                        <groupId>org.apache.kafka</groupId>
                        <artifactId>kafka-clients</artifactId>
                </dependency>
+               <dependency>
+                       <groupId>org.apache.kafka</groupId>
+                       <artifactId>kafka-streams</artifactId>
+               </dependency>
                <dependency>
                        <groupId>org.hibernate.validator</groupId>
                        <artifactId>hibernate-validator</artifactId>
index 8218f99..2c50208 100644 (file)
@@ -14,5 +14,7 @@ import org.springframework.boot.context.properties.ConfigurationProperties;
 public class UsersApplicationProperties
 {
   private String bootstrapServer = "localhost:9092";
+  private String applicationId = "users";
   private String topic = "users";
+  private String applicationServer = "localhost:8080";
 }
index 4e64524..b0ab941 100644 (file)
@@ -4,14 +4,16 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.springframework.http.HttpStatus;
 import org.springframework.http.ResponseEntity;
 import org.springframework.util.MimeTypeUtils;
-import org.springframework.web.bind.annotation.PostMapping;
-import org.springframework.web.bind.annotation.RequestBody;
-import org.springframework.web.bind.annotation.RestController;
+import org.springframework.web.bind.annotation.*;
 import org.springframework.web.context.request.async.DeferredResult;
 
+import java.net.URI;
+import java.util.Optional;
+
 
 @RestController
 public class UsersController
@@ -19,16 +21,20 @@ public class UsersController
   private final String topic;
   private final ObjectMapper mapper;
   private final KafkaProducer<String, String> producer;
+  private final UsersStreamProcessor processor;
+
 
 
   public UsersController(
       UsersApplicationProperties properties,
       ObjectMapper mapper,
-      KafkaProducer<String,String> producer)
+      KafkaProducer<String,String> producer,
+      UsersStreamProcessor processor)
   {
     this.topic = properties.getTopic();
     this.mapper = mapper;
     this.producer = producer;
+    this.processor = processor;
   }
 
   @PostMapping(
@@ -76,4 +82,27 @@ public class UsersController
 
     return result;
   }
+
+  @GetMapping("/users/{username}")
+  ResponseEntity<User> queryFor(@PathVariable String username)
+  {
+    Optional<URI> redirect = processor.getRedirect(username);
+    if (redirect.isPresent())
+    {
+      return
+          ResponseEntity
+              .status(HttpStatus.TEMPORARY_REDIRECT)
+              .location(redirect.get())
+              .build();
+    }
+
+    try
+    {
+      return ResponseEntity.of(processor.getUser(username));
+    }
+    catch (InvalidStateStoreException e)
+    {
+      return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE).build();
+    }
+  }
 }
diff --git a/src/main/java/de/juplo/kafka/wordcount/users/UsersStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/users/UsersStreamProcessor.java
new file mode 100644 (file)
index 0000000..ef39e89
--- /dev/null
@@ -0,0 +1,118 @@
+package de.juplo.kafka.wordcount.users;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.*;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.HostInfo;
+import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.springframework.boot.SpringApplication;
+import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.net.URI;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
+
+
+@Slf4j
+@Component
+public class UsersStreamProcessor
+{
+       public final KafkaStreams streams;
+       public final HostInfo hostInfo;
+       public final String storeName = "rankingsByUsername";
+       public final StoreQueryParameters<ReadOnlyKeyValueStore<String, String>> storeParameters;
+       public final ObjectMapper mapper;
+
+
+       public UsersStreamProcessor(
+                       UsersApplicationProperties properties,
+                       ObjectMapper mapper,
+                       ConfigurableApplicationContext context)
+       {
+               StreamsBuilder builder = new StreamsBuilder();
+               builder.table(properties.getTopic(), Materialized.as(storeName));
+
+               Properties props = new Properties();
+               props.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId());
+               props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, properties.getApplicationServer());
+               props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
+               props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+               props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+               props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+               streams = new KafkaStreams(builder.build(), props);
+               streams.setUncaughtExceptionHandler((Throwable e) ->
+               {
+                       log.error("Unexpected error!", e);
+                       CompletableFuture.runAsync(() ->
+                       {
+                               log.info("Stopping application...");
+                               SpringApplication.exit(context, () -> 1);
+                       });
+                       return SHUTDOWN_CLIENT;
+               });
+
+               hostInfo = HostInfo.buildFromEndpoint(properties.getApplicationServer());
+               storeParameters = StoreQueryParameters.fromNameAndType(storeName, QueryableStoreTypes.keyValueStore());;
+               this.mapper = mapper;
+       }
+
+       public Optional<URI> getRedirect(String username)
+       {
+               KeyQueryMetadata metadata = streams.queryMetadataForKey(storeName, username, Serdes.String().serializer());
+               HostInfo activeHost = metadata.activeHost();
+               log.debug("Local store for {}: {}, {}:{}", username, metadata.partition(), activeHost.host(), activeHost.port());
+
+               if (activeHost.equals(this.hostInfo) || activeHost.equals(HostInfo.unavailable()))
+               {
+                       return Optional.empty();
+               }
+
+               URI location = URI.create("http://" + activeHost.host() + ":" + activeHost.port() + "/" + username);
+               log.debug("Redirecting to {}", location);
+               return Optional.of(location);
+       }
+
+       public Optional<User> getUser(String username)
+       {
+               return
+                               Optional
+                                               .ofNullable(streams.store(storeParameters).get(username))
+                                               .map(json ->
+                                               {
+                                                       try
+                                                       {
+                                                               return mapper.readValue(json, User.class);
+                                                       }
+                                                       catch (JsonProcessingException e)
+                                                       {
+                                                               throw new RuntimeException(e);
+                                                       }
+                                               });
+       }
+
+       @PostConstruct
+       public void start()
+       {
+               log.info("Starting Stream-Processor");
+               streams.start();
+       }
+
+       @PreDestroy
+       public void stop()
+       {
+               log.info("Stopping Stream-Processor");
+               streams.close();
+       }
+}