Further simplified the example: Knowledge about the key is not required
[demos/kafka/deduplication] / src / main / java / de / juplo / demo / kafka / deduplication / Deduplicator.java
1 package de.juplo.demo.kafka.deduplication;
2
3
4 import org.apache.kafka.common.serialization.IntegerSerializer;
5 import org.apache.kafka.common.serialization.Serdes;
6 import org.apache.kafka.streams.*;
7 import org.apache.kafka.streams.kstream.ValueTransformer;
8 import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
9 import org.apache.kafka.streams.state.*;
10 import org.slf4j.Logger;
11 import org.slf4j.LoggerFactory;
12 import org.springframework.boot.autoconfigure.web.ServerProperties;
13 import org.springframework.http.HttpHeaders;
14 import org.springframework.http.HttpStatus;
15 import org.springframework.http.MediaType;
16 import org.springframework.http.ResponseEntity;
17 import org.springframework.web.bind.annotation.GetMapping;
18 import org.springframework.web.bind.annotation.PathVariable;
19 import org.springframework.web.bind.annotation.RestController;
20
21 import javax.annotation.PostConstruct;
22 import javax.annotation.PreDestroy;
23 import java.time.Duration;
24 import java.util.Properties;
25
26
27 @RestController
28 public class Deduplicator
29 {
30   final static Logger LOG = LoggerFactory.getLogger(Deduplicator.class);
31
32   public final KafkaStreams streams;
33   public final String host;
34   public final int port;
35
36   public Deduplicator(
37       ServerProperties serverProperties,
38       StreamsHealthIndicator healthIndicator)
39   {
40     Properties properties = new Properties();
41     properties.put("bootstrap.servers", "kafka:9092");
42     properties.put("application.id", "streams-deduplicator");
43     properties.put("default.key.serde", Serdes.StringSerde.class);
44     properties.put("default.value.serde", Serdes.StringSerde.class);
45
46     this.host = serverProperties.getAddress().getHostAddress();
47     this.port = serverProperties.getPort();
48     properties.put("application.server", host + ":" + port);
49
50     streams = new KafkaStreams(Deduplicator.buildTopology(), properties);
51     streams.setUncaughtExceptionHandler((Thread t, Throwable e) ->
52     {
53       LOG.error("Unexpected error in thread {}: {}", t, e.toString());
54       try
55       {
56         streams.close(Duration.ofSeconds(5));
57       }
58       catch (Exception ex)
59       {
60         LOG.error("Could not close KafkaStreams!", ex);
61       }
62     });
63     streams.setStateListener(healthIndicator);
64   }
65
66   static Topology buildTopology()
67   {
68     StreamsBuilder builder = new StreamsBuilder();
69
70     // Create state-store for sequence numbers
71     StoreBuilder<KeyValueStore<Integer,Long>> store =
72         Stores.keyValueStoreBuilder(
73             Stores.persistentKeyValueStore(DeduplicationTransformer.STORE),
74             Serdes.Integer(),
75             Serdes.Long());
76     // register store
77     builder.addStateStore(store);
78
79     builder
80         .<String, String>stream("input")
81         .flatTransformValues(
82             new ValueTransformerSupplier<String, Iterable<String>>()
83             {
84               @Override
85               public ValueTransformer<String, Iterable<String>> get()
86               {
87                 return new DeduplicationTransformer();
88               }
89             },
90             DeduplicationTransformer.STORE)
91         .to("output");
92
93     return builder.build();
94   }
95
96   @GetMapping(path = "/watermark/{partition}", produces = MediaType.TEXT_PLAIN_VALUE)
97   public ResponseEntity<String> getWatermarks(@PathVariable Integer partition)
98   {
99     KeyQueryMetadata metadata =
100         streams.queryMetadataForKey(
101             DeduplicationTransformer.STORE,
102             partition, new IntegerSerializer());
103
104     if (metadata.getActiveHost().port() != port ||
105         !metadata.getActiveHost().host().equals(host))
106     {
107       return
108           ResponseEntity
109               .status(HttpStatus.TEMPORARY_REDIRECT)
110               .header(
111                   HttpHeaders.LOCATION,
112                   "http://" +
113                       metadata.getActiveHost().host() +
114                       ":" +
115                       metadata.getActiveHost().port() +
116                       "/watermark/" +
117                       partition)
118               .build();
119     }
120
121     ReadOnlyKeyValueStore<Integer, Long> store = streams.store(
122         StoreQueryParameters.fromNameAndType(
123             DeduplicationTransformer.STORE,
124             QueryableStoreTypes.keyValueStore()));
125
126     Long watermark = store.get(partition);
127     if (watermark == null)
128       return ResponseEntity.notFound().build();
129
130     return ResponseEntity.ok().body(watermark.toString());
131   }
132
133
134   @PostConstruct
135   public void start()
136   {
137     streams.start();
138   }
139
140   @PreDestroy
141   public void stop()
142   {
143     streams.close(Duration.ofSeconds(5));
144   }
145 }