--
Referenz auf State-Store wird dauerhaft gespeichert
* Die Referenz auf den State-Store wird nicht bei jedem GET-Aufruf neu geholt
* Der richtige Ort dafür ist ein State-Listener
* Grund: Ruft man die Refrenz direkt nach dem Start der KafkaStreams-Instanz
auf, befindet sich die App noch nicht im Zustand RUNNING und der Abruf
schlegt fehl!
private final int port;
private final KafkaStreams streams;
private final int port;
private final KafkaStreams streams;
+ private ReadOnlyKeyValueStore<String, Order> orders;
+
public DetailsService(ApplicationProperties config)
{
public DetailsService(ApplicationProperties config)
{
LOG.error("Could not close KafkaStreams!", ex);
}
});
LOG.error("Could not close KafkaStreams!", ex);
}
});
+ streams.setStateListener((newState, oldState) ->
+ {
+ if (newState == KafkaStreams.State.RUNNING)
+ orders = streams.store(topic, QueryableStoreTypes.keyValueStore());
+ });
- ReadOnlyKeyValueStore<String, Order> orders;
- orders = streams.store(topic, QueryableStoreTypes.keyValueStore());
Order order = orders.get(id);
return order == null
? ResponseEntity.notFound().build()
Order order = orders.get(id);
return order == null
? ResponseEntity.notFound().build()