splitter: Implemented a test that proofs the splitting process
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / splitter / SplitterStreamProcessor.java
1 package de.juplo.kafka.wordcount.splitter;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.consumer.*;
5 import org.apache.kafka.clients.producer.KafkaProducer;
6 import org.apache.kafka.clients.producer.ProducerConfig;
7 import org.apache.kafka.clients.producer.ProducerRecord;
8 import org.apache.kafka.common.TopicPartition;
9 import org.apache.kafka.common.errors.WakeupException;
10 import org.apache.kafka.common.serialization.StringDeserializer;
11 import org.apache.kafka.common.serialization.StringSerializer;
12 import org.springframework.core.task.TaskExecutor;
13 import org.springframework.stereotype.Component;
14 import org.springframework.util.Assert;
15
16 import javax.annotation.PreDestroy;
17 import java.time.Clock;
18 import java.time.Duration;
19 import java.util.*;
20 import java.util.concurrent.locks.Lock;
21 import java.util.concurrent.locks.ReentrantLock;
22 import java.util.stream.Collectors;
23
24
25 @Component
26 @Slf4j
27 public class SplitterStreamProcessor implements Runnable
28 {
29   private final MessageSplitter splitter;
30   private final String inputTopic;
31   private final String outputTopic;
32   private final KafkaConsumer<String, String> consumer;
33   private final KafkaProducer<String, String> producer;
34   private final Clock clock;
35   private final int commitInterval;
36   private final Lock running = new ReentrantLock();
37
38   private boolean stopped = false;
39   private long[] offsets;
40   private Optional<Integer>[] leaderEpochs;
41   private long lastCommit;
42
43   public SplitterStreamProcessor(
44       MessageSplitter splitter,
45       SplitterApplicationProperties properties,
46       Clock clock,
47       TaskExecutor executor)
48   {
49     this.splitter = splitter;
50
51     this.inputTopic = properties.getInputTopic();
52     this.outputTopic = properties.getOutputTopic();
53
54     this.clock = clock;
55     this.commitInterval = properties.getCommitInterval();
56
57     Assert.hasText(properties.getBootstrapServer(), "juplo.wordcount.splitter.bootstrap-server must be set");
58
59     Properties props;
60
61     props = new Properties();
62     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
63     props.put(ConsumerConfig.GROUP_ID_CONFIG, properties.getGroupId());
64     props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
65     props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
66     props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
67     consumer = new KafkaConsumer<>(props);
68
69     props = new Properties();
70     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
71     props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
72     props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-ID");
73     props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
74     props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
75     producer = new KafkaProducer<>(props);
76
77     executor.execute(this);
78   }
79
80   public void run()
81   {
82     running.lock();
83
84     try
85     {
86       log.info("Initializing transaction");
87       producer.initTransactions();
88
89       log.info("Subscribing to topic {}", inputTopic);
90       consumer.subscribe(
91           Arrays.asList(inputTopic),
92           new TransactionalConsumerRebalanceListener());
93
94       while (!stopped)
95       {
96         ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
97
98         records.forEach(inputRecord ->
99         {
100           log.debug(
101               "Received a recording of {}, partition={}, offset={}, epoch={}",
102               inputRecord.key(),
103               inputRecord.partition(),
104               inputRecord.offset(),
105               inputRecord.leaderEpoch());
106
107           offsets[inputRecord.partition()] = inputRecord.offset();
108           leaderEpochs[inputRecord.partition()] = inputRecord.leaderEpoch();
109
110           String[] words = splitter.split(inputRecord.value());
111           for (int i = 0; i < words.length; i++)
112           {
113             ProducerRecord<String, String> outputRecord =
114                 new ProducerRecord<>(
115                     outputTopic,
116                     inputRecord.key(),
117                     words[i].trim());
118
119             producer.send(outputRecord, (metadata, exception) ->
120             {
121               if (exception == null)
122               {
123                 // HANDLE SUCCESS
124                 log.debug(
125                     "Sent {}={}, partition={}, offset={}",
126                     outputRecord.key(),
127                     outputRecord.value(),
128                     metadata.partition(),
129                     metadata.offset());
130               }
131               else
132               {
133                 // HANDLE ERROR
134                 log.error(
135                     "Could not send {}={}: {}",
136                     outputRecord.key(),
137                     outputRecord.value(),
138                     exception.toString());
139               }
140             });
141           }
142
143           long delta = clock.millis() - lastCommit;
144           if (delta > commitInterval)
145           {
146             log.info("Elapsed time since last commit: {}ms", delta);
147             commitTransaction();
148             beginTransaction();
149           }
150         });
151       }
152     }
153     catch (WakeupException e)
154     {
155       log.info("Waking up from exception!");
156       // Nicht nötig, da consumer.close() onPartitionsRevoked() auslöst!
157       // commitTransaction();
158     }
159     catch (Exception e)
160     {
161       log.error("Unexpected exception!", e);
162       producer.abortTransaction();
163     }
164     finally
165     {
166       try
167       {
168         log.info("Closing consumer");
169         consumer.close();
170         log.info("Closing producer");
171         producer.close();
172         log.info("Exiting!");
173       }
174       finally
175       {
176         running.unlock();
177       }
178     }
179   }
180
181   private void beginTransaction()
182   {
183     log.info("Beginning new transaction");
184     lastCommit = clock.millis();
185     producer.beginTransaction();
186   }
187
188   private void commitTransaction()
189   {
190     Map<TopicPartition, OffsetAndMetadata> offsetsToSend = new HashMap<>();
191     for (int i = 0; i < offsets.length; i++)
192     {
193       if (offsets[i] > 0)
194       {
195         offsetsToSend.put(
196             new TopicPartition(inputTopic, i),
197             new OffsetAndMetadata(offsets[i], leaderEpochs[i], ""));
198       }
199     }
200     producer.sendOffsetsToTransaction(
201         offsetsToSend,
202         consumer.groupMetadata());
203     log.info("Committing transaction");
204     producer.commitTransaction();
205   }
206
207   class TransactionalConsumerRebalanceListener implements ConsumerRebalanceListener
208   {
209     @Override
210     public void onPartitionsAssigned(Collection<TopicPartition> partitions)
211     {
212       log.info("Assigned partitions: {}", toString(partitions));
213
214       // Compote the length of an array, that can be used to store the offsets
215       // (We can ignore the topic, since we only read from a single one!)
216       int length =
217           partitions
218               .stream()
219               .reduce(
220                   0,
221                   (i, v) -> i < v.partition() ? v.partition() : i,
222                   (l, r) -> l < r ? r : l) + 1;
223       offsets = new long[length];
224       leaderEpochs = new Optional[length];
225
226       beginTransaction();
227     }
228
229     @Override
230     public void onPartitionsRevoked(Collection<TopicPartition> partitions)
231     {
232       log.info("Revoked partitions: {}", toString(partitions));
233       commitTransaction();
234     }
235
236     @Override
237     public void onPartitionsLost(Collection<TopicPartition> partitions)
238     {
239       log.info("Lost partitions: {}", toString(partitions));
240       producer.abortTransaction();
241     }
242
243     String toString(Collection<TopicPartition> partitions)
244     {
245       return
246           partitions
247               .stream()
248               .map(tp -> tp.topic() + "-" + tp.partition())
249               .collect(Collectors.joining(", "));
250     }
251   }
252
253   @PreDestroy
254   public void stop()
255   {
256     log.info("Shutdown requested...");
257     if (stopped)
258     {
259       log.warn("Ignoring request: already stopped!");
260       return;
261     }
262     stopped = true;
263     consumer.wakeup();
264     running.lock();
265     log.info("Shutdown completed!");
266   }
267 }