Simplified code using a lambda
[demos/kafka/deduplication] / src / main / java / de / juplo / demo / kafka / deduplication / Deduplicator.java
1 package de.juplo.demo.kafka.deduplication;
2
3
4 import jakarta.annotation.PostConstruct;
5 import jakarta.annotation.PreDestroy;
6 import org.apache.kafka.common.serialization.IntegerSerializer;
7 import org.apache.kafka.common.serialization.Serdes;
8 import org.apache.kafka.streams.*;
9 import org.apache.kafka.streams.kstream.ValueTransformer;
10 import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
11 import org.apache.kafka.streams.state.*;
12 import org.slf4j.Logger;
13 import org.slf4j.LoggerFactory;
14 import org.springframework.boot.autoconfigure.web.ServerProperties;
15 import org.springframework.http.HttpHeaders;
16 import org.springframework.http.HttpStatus;
17 import org.springframework.http.MediaType;
18 import org.springframework.http.ResponseEntity;
19 import org.springframework.web.bind.annotation.GetMapping;
20 import org.springframework.web.bind.annotation.PathVariable;
21 import org.springframework.web.bind.annotation.RestController;
22
23 import java.time.Duration;
24 import java.util.Properties;
25
26
27 @RestController
28 public class Deduplicator
29 {
30   final static Logger LOG = LoggerFactory.getLogger(Deduplicator.class);
31
32   public final KafkaStreams streams;
33   public final String host;
34   public final int port;
35
36   public Deduplicator(
37       ServerProperties serverProperties,
38       StreamsHealthIndicator healthIndicator)
39   {
40     Properties properties = new Properties();
41     properties.put("bootstrap.servers", "kafka:9092");
42     properties.put("application.id", "streams-deduplicator");
43     properties.put("default.key.serde", Serdes.StringSerde.class);
44     properties.put("default.value.serde", Serdes.StringSerde.class);
45
46     this.host = serverProperties.getAddress().getHostAddress();
47     this.port = serverProperties.getPort();
48     properties.put("application.server", host + ":" + port);
49
50     streams = new KafkaStreams(Deduplicator.buildTopology(), properties);
51     streams.setUncaughtExceptionHandler((Thread t, Throwable e) ->
52     {
53       LOG.error("Unexpected error in thread {}: {}", t, e.toString());
54       try
55       {
56         streams.close(Duration.ofSeconds(5));
57       }
58       catch (Exception ex)
59       {
60         LOG.error("Could not close KafkaStreams!", ex);
61       }
62     });
63     streams.setStateListener(healthIndicator);
64   }
65
66   static Topology buildTopology()
67   {
68     StreamsBuilder builder = new StreamsBuilder();
69
70     // Create state-store for sequence numbers
71     StoreBuilder<KeyValueStore<Integer,Long>> store =
72         Stores.keyValueStoreBuilder(
73             Stores.persistentKeyValueStore(DeduplicationTransformer.STORE),
74             Serdes.Integer(),
75             Serdes.Long());
76     // register store
77     builder.addStateStore(store);
78
79     builder
80         .<String, String>stream("input")
81         .flatTransformValues(
82             () -> new DeduplicationTransformer(),
83             DeduplicationTransformer.STORE)
84         .to("output");
85
86     return builder.build();
87   }
88
89   @GetMapping(path = "/watermark/{partition}", produces = MediaType.TEXT_PLAIN_VALUE)
90   public ResponseEntity<String> getWatermarks(@PathVariable Integer partition)
91   {
92     KeyQueryMetadata metadata =
93         streams.queryMetadataForKey(
94             DeduplicationTransformer.STORE,
95             partition, new IntegerSerializer());
96
97     if (metadata.getActiveHost().port() != port ||
98         !metadata.getActiveHost().host().equals(host))
99     {
100       return
101           ResponseEntity
102               .status(HttpStatus.TEMPORARY_REDIRECT)
103               .header(
104                   HttpHeaders.LOCATION,
105                   "http://" +
106                       metadata.getActiveHost().host() +
107                       ":" +
108                       metadata.getActiveHost().port() +
109                       "/watermark/" +
110                       partition)
111               .build();
112     }
113
114     ReadOnlyKeyValueStore<Integer, Long> store = streams.store(
115         StoreQueryParameters.fromNameAndType(
116             DeduplicationTransformer.STORE,
117             QueryableStoreTypes.keyValueStore()));
118
119     Long watermark = store.get(partition);
120     if (watermark == null)
121       return ResponseEntity.notFound().build();
122
123     return ResponseEntity.ok().body(watermark.toString());
124   }
125
126
127   @PostConstruct
128   public void start()
129   {
130     streams.start();
131   }
132
133   @PreDestroy
134   public void stop()
135   {
136     streams.close(Duration.ofSeconds(5));
137   }
138 }