projects
/
demos
/
kafka
/
deduplication
/ commitdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
| commitdiff |
tree
raw
|
patch
|
inline
| side by side (parent:
232590c
)
Moved the building of the topology into a static method
author
Kai Moritz
<kai@juplo.de>
Fri, 9 Oct 2020 16:07:39 +0000
(18:07 +0200)
committer
Kai Moritz
<kai@juplo.de>
Sat, 10 Oct 2020 19:33:54 +0000
(21:33 +0200)
src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java
patch
|
blob
|
history
diff --git
a/src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java
b/src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java
index
af7da70
..
8f173f7
100644
(file)
--- a/
src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java
+++ b/
src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java
@@
-36,6
+36,23
@@
public class Deduplicator
properties.put("default.key.serde", Serdes.StringSerde.class);
properties.put("default.value.serde", Serdes.StringSerde.class);
properties.put("default.key.serde", Serdes.StringSerde.class);
properties.put("default.value.serde", Serdes.StringSerde.class);
+ streams = new KafkaStreams(Deduplicator.buildTopology(), properties);
+ streams.setUncaughtExceptionHandler((Thread t, Throwable e) ->
+ {
+ LOG.error("Unexpected error in thread {}: {}", t, e.toString());
+ try
+ {
+ streams.close(Duration.ofSeconds(5));
+ }
+ catch (Exception ex)
+ {
+ LOG.error("Could not close KafkaStreams!", ex);
+ }
+ });
+ }
+
+ static Topology buildTopology()
+ {
StreamsBuilder builder = new StreamsBuilder();
// Create state-store for sequence numbers
StreamsBuilder builder = new StreamsBuilder();
// Create state-store for sequence numbers
@@
-61,23
+78,9
@@
public class Deduplicator
DeduplicationTransformer.STORE)
.to("output");
DeduplicationTransformer.STORE)
.to("output");
- Topology topology = builder.build();
- streams = new KafkaStreams(topology, properties);
- streams.setUncaughtExceptionHandler((Thread t, Throwable e) ->
- {
- LOG.error("Unexpected error in thread {}: {}", t, e.toString());
- try
- {
- streams.close(Duration.ofSeconds(5));
- }
- catch (Exception ex)
- {
- LOG.error("Could not close KafkaStreams!", ex);
- }
- });
+ return builder.build();
}
}
-
@PostConstruct
public void start()
{
@PostConstruct
public void start()
{