streams - Übungen - Microservices - Schritt 02
authorKai Moritz <kai@juplo.de>
Sun, 7 Jun 2020 09:59:21 +0000 (11:59 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 7 Jun 2020 15:45:48 +0000 (17:45 +0200)
--
Referenz auf State-Store wird dauerhaft gespeichert

* Die Referenz auf den State-Store wird nicht bei jedem GET-Aufruf neu geholt
* Der richtige Ort dafür ist ein State-Listener
* Grund: Ruft man die Refrenz direkt nach dem Start der KafkaStreams-Instanz
  auf, befindet sich die App noch nicht im Zustand RUNNING und der Abruf
  schlegt fehl!

details/src/main/java/de/trion/microservices/details/DetailsService.java

index a1f8dbd..a6f918f 100644 (file)
@@ -36,6 +36,8 @@ public class DetailsService
   private final int port;
   private final KafkaStreams streams;
 
+  private ReadOnlyKeyValueStore<String, Order> orders;
+
 
   public DetailsService(ApplicationProperties config)
   {
@@ -70,6 +72,11 @@ public class DetailsService
         LOG.error("Could not close KafkaStreams!", ex);
       }
     });
+    streams.setStateListener((newState, oldState) ->
+    {
+      if (newState == KafkaStreams.State.RUNNING)
+        orders = streams.store(topic, QueryableStoreTypes.keyValueStore());
+    });
   }
 
 
@@ -92,8 +99,6 @@ public class DetailsService
               .build();
     }
 
-    ReadOnlyKeyValueStore<String, Order> orders;
-    orders = streams.store(topic, QueryableStoreTypes.keyValueStore());
     Order order = orders.get(id);
     return order == null
         ? ResponseEntity.notFound().build()