791e164d6fe3ddfdf8600dec4430e082462b2733
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / splitter / SplitterStreamProcessor.java
1 package de.juplo.kafka.wordcount.splitter;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.consumer.*;
5 import org.apache.kafka.clients.producer.KafkaProducer;
6 import org.apache.kafka.clients.producer.ProducerConfig;
7 import org.apache.kafka.clients.producer.ProducerRecord;
8 import org.apache.kafka.common.TopicPartition;
9 import org.apache.kafka.common.errors.WakeupException;
10 import org.apache.kafka.common.serialization.StringDeserializer;
11 import org.apache.kafka.common.serialization.StringSerializer;
12 import org.springframework.boot.ApplicationArguments;
13 import org.springframework.boot.ApplicationRunner;
14 import org.springframework.stereotype.Component;
15 import org.springframework.util.Assert;
16
17 import javax.annotation.PreDestroy;
18 import java.time.Clock;
19 import java.time.Duration;
20 import java.util.*;
21 import java.util.concurrent.locks.Lock;
22 import java.util.concurrent.locks.ReentrantLock;
23 import java.util.regex.Pattern;
24 import java.util.stream.Collectors;
25
26
27 @Component
28 @Slf4j
29 public class SplitterStreamProcessor implements ApplicationRunner
30 {
31   final static Pattern PATTERN = Pattern.compile("\\W+");
32
33   private final String inputTopic;
34   private final String outputTopic;
35   private final KafkaConsumer<String, String> consumer;
36   private final KafkaProducer<String, String> producer;
37   private final Clock clock;
38   private final int commitInterval;
39   private final Lock running = new ReentrantLock();
40
41   private boolean stopped = false;
42   private long[] offsets;
43   private Optional<Integer>[] leaderEpochs;
44   private long lastCommit;
45
46   public SplitterStreamProcessor(
47       SplitterApplicationProperties properties,
48       Clock clock)
49   {
50     this.inputTopic = properties.getInputTopic();
51     this.outputTopic = properties.getOutputTopic();
52
53     this.clock = clock;
54     this.commitInterval = properties.getCommitInterval();
55
56     Assert.hasText(properties.getBootstrapServer(), "juplo.wordcount.splitter.bootstrap-server must be set");
57
58     Properties props;
59
60     props = new Properties();
61     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
62     props.put(ConsumerConfig.GROUP_ID_CONFIG, properties.getGroupId());
63     props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
64     props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
65     props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
66     consumer = new KafkaConsumer<>(props);
67
68     props = new Properties();
69     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
70     props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
71     props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-ID");
72     props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
73     props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
74     producer = new KafkaProducer<>(props);
75   }
76
77   @Override
78   public void run(ApplicationArguments args)
79   {
80     running.lock();
81
82     try
83     {
84       log.info("Initializing transaction");
85       producer.initTransactions();
86
87       log.info("Subscribing to topic {}", inputTopic);
88       consumer.subscribe(
89           Arrays.asList(inputTopic),
90           new TransactionalConsumerRebalanceListener());
91
92       while (!stopped)
93       {
94         ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
95
96         records.forEach(inputRecord ->
97         {
98           log.debug(
99               "Received a recording of {}, partition={}, offset={}, epoch={}",
100               inputRecord.key(),
101               inputRecord.partition(),
102               inputRecord.offset(),
103               inputRecord.leaderEpoch());
104
105           offsets[inputRecord.partition()] = inputRecord.offset();
106           leaderEpochs[inputRecord.partition()] = inputRecord.leaderEpoch();
107
108           String[] words = PATTERN.split(inputRecord.value());
109           for (int i = 0; i < words.length; i++)
110           {
111             ProducerRecord<String, String> outputRecord =
112                 new ProducerRecord<>(
113                     outputTopic,
114                     inputRecord.key(),
115                     words[i].trim());
116
117             producer.send(outputRecord, (metadata, exception) ->
118             {
119               if (exception == null)
120               {
121                 // HANDLE SUCCESS
122                 log.debug(
123                     "Sent {}={}, partition={}, offset={}",
124                     outputRecord.key(),
125                     outputRecord.value(),
126                     metadata.partition(),
127                     metadata.offset());
128               }
129               else
130               {
131                 // HANDLE ERROR
132                 log.error(
133                     "Could not send {}={}: {}",
134                     outputRecord.key(),
135                     outputRecord.value(),
136                     exception.toString());
137               }
138             });
139           }
140
141           long delta = clock.millis() - lastCommit;
142           if (delta > commitInterval)
143           {
144             log.info("Elapsed time since last commit: {}ms", delta);
145             commitTransaction();
146             beginTransaction();
147           }
148         });
149       }
150     }
151     catch (WakeupException e)
152     {
153       log.info("Waking up from exception!");
154       // Nicht nötig, da consumer.close() onPartitionsRevoked() auslöst!
155       // commitTransaction();
156     }
157     catch (Exception e)
158     {
159       log.error("Unexpected exception!", e);
160       producer.abortTransaction();
161     }
162     finally
163     {
164       try
165       {
166         log.info("Closing consumer");
167         consumer.close();
168         log.info("Closing producer");
169         producer.close();
170         log.info("Exiting!");
171       }
172       finally
173       {
174         running.unlock();
175       }
176     }
177   }
178
179   private void beginTransaction()
180   {
181     log.info("Beginning new transaction");
182     lastCommit = clock.millis();
183     producer.beginTransaction();
184   }
185
186   private void commitTransaction()
187   {
188     Map<TopicPartition, OffsetAndMetadata> offsetsToSend = new HashMap<>();
189     for (int i = 0; i < offsets.length; i++)
190     {
191       if (offsets[i] > 0)
192       {
193         offsetsToSend.put(
194             new TopicPartition(inputTopic, i),
195             new OffsetAndMetadata(offsets[i], leaderEpochs[i], ""));
196       }
197     }
198     producer.sendOffsetsToTransaction(
199         offsetsToSend,
200         consumer.groupMetadata());
201     log.info("Committing transaction");
202     producer.commitTransaction();
203   }
204
205   class TransactionalConsumerRebalanceListener implements ConsumerRebalanceListener
206   {
207     @Override
208     public void onPartitionsAssigned(Collection<TopicPartition> partitions)
209     {
210       log.info("Assigned partitions: {}", toString(partitions));
211
212       // Compote the length of an array, that can be used to store the offsets
213       // (We can ignore the topic, since we only read from a single one!)
214       int length =
215           partitions
216               .stream()
217               .reduce(
218                   0,
219                   (i, v) -> i < v.partition() ? v.partition() : i,
220                   (l, r) -> l < r ? r : l) + 1;
221       offsets = new long[length];
222       leaderEpochs = new Optional[length];
223
224       beginTransaction();
225     }
226
227     @Override
228     public void onPartitionsRevoked(Collection<TopicPartition> partitions)
229     {
230       log.info("Revoked partitions: {}", toString(partitions));
231       commitTransaction();
232     }
233
234     @Override
235     public void onPartitionsLost(Collection<TopicPartition> partitions)
236     {
237       log.info("Lost partitions: {}", toString(partitions));
238       producer.abortTransaction();
239     }
240
241     String toString(Collection<TopicPartition> partitions)
242     {
243       return
244           partitions
245               .stream()
246               .map(tp -> tp.topic() + "-" + tp.partition())
247               .collect(Collectors.joining(", "));
248     }
249   }
250
251   @PreDestroy
252   public void stop()
253   {
254     log.info("Shutdown requested...");
255     stopped = true;
256     consumer.wakeup();
257     running.lock();
258     log.info("Shutdown completed!");
259   }
260 }