splitter: 1.0.0-vanilla-kafka - Fixed the test "Context Loads"
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / splitter / SplitterStreamProcessor.java
1 package de.juplo.kafka.wordcount.splitter;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.consumer.*;
5 import org.apache.kafka.clients.producer.KafkaProducer;
6 import org.apache.kafka.clients.producer.ProducerConfig;
7 import org.apache.kafka.clients.producer.ProducerRecord;
8 import org.apache.kafka.common.TopicPartition;
9 import org.apache.kafka.common.errors.WakeupException;
10 import org.apache.kafka.common.serialization.StringDeserializer;
11 import org.apache.kafka.common.serialization.StringSerializer;
12 import org.springframework.core.task.TaskExecutor;
13 import org.springframework.stereotype.Component;
14 import org.springframework.util.Assert;
15
16 import javax.annotation.PreDestroy;
17 import java.time.Clock;
18 import java.time.Duration;
19 import java.util.*;
20 import java.util.concurrent.locks.Lock;
21 import java.util.concurrent.locks.ReentrantLock;
22 import java.util.regex.Pattern;
23 import java.util.stream.Collectors;
24
25
26 @Component
27 @Slf4j
28 public class SplitterStreamProcessor implements Runnable
29 {
30   final static Pattern PATTERN = Pattern.compile("\\W+");
31
32   private final String inputTopic;
33   private final String outputTopic;
34   private final KafkaConsumer<String, String> consumer;
35   private final KafkaProducer<String, String> producer;
36   private final Clock clock;
37   private final int commitInterval;
38   private final Lock running = new ReentrantLock();
39
40   private boolean stopped = false;
41   private long[] offsets;
42   private Optional<Integer>[] leaderEpochs;
43   private long lastCommit;
44
45   public SplitterStreamProcessor(
46       SplitterApplicationProperties properties,
47       Clock clock,
48       TaskExecutor executor)
49   {
50     this.inputTopic = properties.getInputTopic();
51     this.outputTopic = properties.getOutputTopic();
52
53     this.clock = clock;
54     this.commitInterval = properties.getCommitInterval();
55
56     Assert.hasText(properties.getBootstrapServer(), "juplo.wordcount.splitter.bootstrap-server must be set");
57
58     Properties props;
59
60     props = new Properties();
61     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
62     props.put(ConsumerConfig.GROUP_ID_CONFIG, properties.getGroupId());
63     props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
64     props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
65     props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
66     consumer = new KafkaConsumer<>(props);
67
68     props = new Properties();
69     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
70     props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
71     props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-ID");
72     props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
73     props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
74     producer = new KafkaProducer<>(props);
75
76     executor.execute(this);
77   }
78
79   public void run()
80   {
81     running.lock();
82
83     try
84     {
85       log.info("Initializing transaction");
86       producer.initTransactions();
87
88       log.info("Subscribing to topic {}", inputTopic);
89       consumer.subscribe(
90           Arrays.asList(inputTopic),
91           new TransactionalConsumerRebalanceListener());
92
93       while (!stopped)
94       {
95         ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
96
97         records.forEach(inputRecord ->
98         {
99           log.debug(
100               "Received a recording of {}, partition={}, offset={}, epoch={}",
101               inputRecord.key(),
102               inputRecord.partition(),
103               inputRecord.offset(),
104               inputRecord.leaderEpoch());
105
106           offsets[inputRecord.partition()] = inputRecord.offset();
107           leaderEpochs[inputRecord.partition()] = inputRecord.leaderEpoch();
108
109           String[] words = PATTERN.split(inputRecord.value());
110           for (int i = 0; i < words.length; i++)
111           {
112             ProducerRecord<String, String> outputRecord =
113                 new ProducerRecord<>(
114                     outputTopic,
115                     inputRecord.key(),
116                     words[i].trim());
117
118             producer.send(outputRecord, (metadata, exception) ->
119             {
120               if (exception == null)
121               {
122                 // HANDLE SUCCESS
123                 log.debug(
124                     "Sent {}={}, partition={}, offset={}",
125                     outputRecord.key(),
126                     outputRecord.value(),
127                     metadata.partition(),
128                     metadata.offset());
129               }
130               else
131               {
132                 // HANDLE ERROR
133                 log.error(
134                     "Could not send {}={}: {}",
135                     outputRecord.key(),
136                     outputRecord.value(),
137                     exception.toString());
138               }
139             });
140           }
141
142           long delta = clock.millis() - lastCommit;
143           if (delta > commitInterval)
144           {
145             log.info("Elapsed time since last commit: {}ms", delta);
146             commitTransaction();
147             beginTransaction();
148           }
149         });
150       }
151     }
152     catch (WakeupException e)
153     {
154       log.info("Waking up from exception!");
155       // Nicht nötig, da consumer.close() onPartitionsRevoked() auslöst!
156       // commitTransaction();
157     }
158     catch (Exception e)
159     {
160       log.error("Unexpected exception!", e);
161       producer.abortTransaction();
162     }
163     finally
164     {
165       try
166       {
167         log.info("Closing consumer");
168         consumer.close();
169         log.info("Closing producer");
170         producer.close();
171         log.info("Exiting!");
172       }
173       finally
174       {
175         running.unlock();
176       }
177     }
178   }
179
180   private void beginTransaction()
181   {
182     log.info("Beginning new transaction");
183     lastCommit = clock.millis();
184     producer.beginTransaction();
185   }
186
187   private void commitTransaction()
188   {
189     Map<TopicPartition, OffsetAndMetadata> offsetsToSend = new HashMap<>();
190     for (int i = 0; i < offsets.length; i++)
191     {
192       if (offsets[i] > 0)
193       {
194         offsetsToSend.put(
195             new TopicPartition(inputTopic, i),
196             new OffsetAndMetadata(offsets[i], leaderEpochs[i], ""));
197       }
198     }
199     producer.sendOffsetsToTransaction(
200         offsetsToSend,
201         consumer.groupMetadata());
202     log.info("Committing transaction");
203     producer.commitTransaction();
204   }
205
206   class TransactionalConsumerRebalanceListener implements ConsumerRebalanceListener
207   {
208     @Override
209     public void onPartitionsAssigned(Collection<TopicPartition> partitions)
210     {
211       log.info("Assigned partitions: {}", toString(partitions));
212
213       // Compote the length of an array, that can be used to store the offsets
214       // (We can ignore the topic, since we only read from a single one!)
215       int length =
216           partitions
217               .stream()
218               .reduce(
219                   0,
220                   (i, v) -> i < v.partition() ? v.partition() : i,
221                   (l, r) -> l < r ? r : l) + 1;
222       offsets = new long[length];
223       leaderEpochs = new Optional[length];
224
225       beginTransaction();
226     }
227
228     @Override
229     public void onPartitionsRevoked(Collection<TopicPartition> partitions)
230     {
231       log.info("Revoked partitions: {}", toString(partitions));
232       commitTransaction();
233     }
234
235     @Override
236     public void onPartitionsLost(Collection<TopicPartition> partitions)
237     {
238       log.info("Lost partitions: {}", toString(partitions));
239       producer.abortTransaction();
240     }
241
242     String toString(Collection<TopicPartition> partitions)
243     {
244       return
245           partitions
246               .stream()
247               .map(tp -> tp.topic() + "-" + tp.partition())
248               .collect(Collectors.joining(", "));
249     }
250   }
251
252   @PreDestroy
253   public void stop()
254   {
255     log.info("Shutdown requested...");
256     stopped = true;
257     consumer.wakeup();
258     running.lock();
259     log.info("Shutdown completed!");
260   }
261 }