From: Kai Moritz Date: Sun, 7 Jun 2020 09:59:21 +0000 (+0200) Subject: streams - Übungen - Microservices - Schritt 02 X-Git-Url: https://juplo.de/gitweb/?p=demos%2Fmicroservices;a=commitdiff_plain;h=1b0e793bec5424b74df264dd98a80fb8375167ba streams - Übungen - Microservices - Schritt 02 -- Referenz auf State-Store wird dauerhaft gespeichert * Die Referenz auf den State-Store wird nicht bei jedem GET-Aufruf neu geholt * Der richtige Ort dafür ist ein State-Listener * Grund: Ruft man die Refrenz direkt nach dem Start der KafkaStreams-Instanz auf, befindet sich die App noch nicht im Zustand RUNNING und der Abruf schlegt fehl! --- diff --git a/details/src/main/java/de/trion/microservices/details/DetailsService.java b/details/src/main/java/de/trion/microservices/details/DetailsService.java index a1f8dbd..a6f918f 100644 --- a/details/src/main/java/de/trion/microservices/details/DetailsService.java +++ b/details/src/main/java/de/trion/microservices/details/DetailsService.java @@ -36,6 +36,8 @@ public class DetailsService private final int port; private final KafkaStreams streams; + private ReadOnlyKeyValueStore orders; + public DetailsService(ApplicationProperties config) { @@ -70,6 +72,11 @@ public class DetailsService LOG.error("Could not close KafkaStreams!", ex); } }); + streams.setStateListener((newState, oldState) -> + { + if (newState == KafkaStreams.State.RUNNING) + orders = streams.store(topic, QueryableStoreTypes.keyValueStore()); + }); } @@ -92,8 +99,6 @@ public class DetailsService .build(); } - ReadOnlyKeyValueStore orders; - orders = streams.store(topic, QueryableStoreTypes.keyValueStore()); Order order = orders.get(id); return order == null ? ResponseEntity.notFound().build()