- Topology topology = builder.build();
- streams = new KafkaStreams(topology, properties);
- streams.setUncaughtExceptionHandler((Thread t, Throwable e) ->
- {
- LOG.error("Unexpected error in thread {}: {}", t, e.toString());
- try
- {
- streams.close(Duration.ofSeconds(5));
- }
- catch (Exception ex)
- {
- LOG.error("Could not close KafkaStreams!", ex);
- }
- });