From 1b0e793bec5424b74df264dd98a80fb8375167ba Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 7 Jun 2020 11:59:21 +0200 Subject: [PATCH] =?utf8?q?streams=20-=20=C3=9Cbungen=20-=20Microservices?= =?utf8?q?=20-=20Schritt=2002=20--=20Referenz=20auf=20State-Store=20wird?= =?utf8?q?=20dauerhaft=20gespeichert?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Die Referenz auf den State-Store wird nicht bei jedem GET-Aufruf neu geholt * Der richtige Ort dafür ist ein State-Listener * Grund: Ruft man die Refrenz direkt nach dem Start der KafkaStreams-Instanz auf, befindet sich die App noch nicht im Zustand RUNNING und der Abruf schlegt fehl! --- .../de/trion/microservices/details/DetailsService.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/details/src/main/java/de/trion/microservices/details/DetailsService.java b/details/src/main/java/de/trion/microservices/details/DetailsService.java index a1f8dbd..a6f918f 100644 --- a/details/src/main/java/de/trion/microservices/details/DetailsService.java +++ b/details/src/main/java/de/trion/microservices/details/DetailsService.java @@ -36,6 +36,8 @@ public class DetailsService private final int port; private final KafkaStreams streams; + private ReadOnlyKeyValueStore orders; + public DetailsService(ApplicationProperties config) { @@ -70,6 +72,11 @@ public class DetailsService LOG.error("Could not close KafkaStreams!", ex); } }); + streams.setStateListener((newState, oldState) -> + { + if (newState == KafkaStreams.State.RUNNING) + orders = streams.store(topic, QueryableStoreTypes.keyValueStore()); + }); } @@ -92,8 +99,6 @@ public class DetailsService .build(); } - ReadOnlyKeyValueStore orders; - orders = streams.store(topic, QueryableStoreTypes.keyValueStore()); Order order = orders.get(id); return order == null ? ResponseEntity.notFound().build() -- 2.20.1