1 package de.juplo.kafka.wordcount.splitter;
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.consumer.*;
5 import org.apache.kafka.clients.producer.KafkaProducer;
6 import org.apache.kafka.clients.producer.ProducerConfig;
7 import org.apache.kafka.clients.producer.ProducerRecord;
8 import org.apache.kafka.common.TopicPartition;
9 import org.apache.kafka.common.errors.WakeupException;
10 import org.apache.kafka.common.serialization.StringDeserializer;
11 import org.apache.kafka.common.serialization.StringSerializer;
12 import org.springframework.boot.ApplicationArguments;
13 import org.springframework.boot.ApplicationRunner;
14 import org.springframework.stereotype.Component;
15 import org.springframework.util.Assert;
17 import javax.annotation.PreDestroy;
18 import java.time.Clock;
19 import java.time.Duration;
21 import java.util.concurrent.locks.Lock;
22 import java.util.concurrent.locks.ReentrantLock;
23 import java.util.regex.Pattern;
24 import java.util.stream.Collectors;
29 public class SplitterStreamProcessor implements ApplicationRunner
31 final static Pattern PATTERN = Pattern.compile("\\W+");
33 private final String inputTopic;
34 private final String outputTopic;
35 private final KafkaConsumer<String, String> consumer;
36 private final KafkaProducer<String, String> producer;
37 private final Clock clock;
38 private final int commitInterval;
39 private final Lock running = new ReentrantLock();
41 private boolean stopped = false;
42 private long[] offsets;
43 private Optional<Integer>[] leaderEpochs;
44 private long lastCommit;
46 public SplitterStreamProcessor(
47 SplitterApplicationProperties properties,
50 this.inputTopic = properties.getInputTopic();
51 this.outputTopic = properties.getOutputTopic();
54 this.commitInterval = properties.getCommitInterval();
56 Assert.hasText(properties.getBootstrapServer(), "juplo.wordcount.splitter.bootstrap-server must be set");
60 props = new Properties();
61 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
62 props.put(ConsumerConfig.GROUP_ID_CONFIG, properties.getGroupId());
63 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
64 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
65 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
66 consumer = new KafkaConsumer<>(props);
68 props = new Properties();
69 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
70 props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
71 props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-ID");
72 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
73 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
74 producer = new KafkaProducer<>(props);
78 public void run(ApplicationArguments args)
84 log.info("Initializing transaction");
85 producer.initTransactions();
87 log.info("Subscribing to topic {}", inputTopic);
89 Arrays.asList(inputTopic),
90 new TransactionalConsumerRebalanceListener());
94 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
96 records.forEach(inputRecord ->
99 "Received a recording of {}, partition={}, offset={}, epoch={}",
101 inputRecord.partition(),
102 inputRecord.offset(),
103 inputRecord.leaderEpoch());
105 offsets[inputRecord.partition()] = inputRecord.offset();
106 leaderEpochs[inputRecord.partition()] = inputRecord.leaderEpoch();
108 String[] words = PATTERN.split(inputRecord.value());
109 for (int i = 0; i < words.length; i++)
111 ProducerRecord<String, String> outputRecord =
112 new ProducerRecord<>(
117 producer.send(outputRecord, (metadata, exception) ->
119 if (exception == null)
123 "Sent {}={}, partition={}, offset={}",
125 outputRecord.value(),
126 metadata.partition(),
133 "Could not send {}={}: {}",
135 outputRecord.value(),
136 exception.toString());
141 long delta = clock.millis() - lastCommit;
142 if (delta > commitInterval)
144 log.info("Elapsed time since last commit: {}ms", delta);
151 catch (WakeupException e)
153 log.info("Waking up from exception!");
154 // Nicht nötig, da consumer.close() onPartitionsRevoked() auslöst!
155 // commitTransaction();
159 log.error("Unexpected exception!", e);
160 producer.abortTransaction();
166 log.info("Closing consumer");
168 log.info("Closing producer");
170 log.info("Exiting!");
179 private void beginTransaction()
181 log.info("Beginning new transaction");
182 lastCommit = clock.millis();
183 producer.beginTransaction();
186 private void commitTransaction()
188 Map<TopicPartition, OffsetAndMetadata> offsetsToSend = new HashMap<>();
189 for (int i = 0; i < offsets.length; i++)
194 new TopicPartition(inputTopic, i),
195 new OffsetAndMetadata(offsets[i], leaderEpochs[i], ""));
198 producer.sendOffsetsToTransaction(
200 consumer.groupMetadata());
201 log.info("Committing transaction");
202 producer.commitTransaction();
205 class TransactionalConsumerRebalanceListener implements ConsumerRebalanceListener
208 public void onPartitionsAssigned(Collection<TopicPartition> partitions)
210 log.info("Assigned partitions: {}", toString(partitions));
212 // Compote the length of an array, that can be used to store the offsets
213 // (We can ignore the topic, since we only read from a single one!)
219 (i, v) -> i < v.partition() ? v.partition() : i,
220 (l, r) -> l < r ? r : l) + 1;
221 offsets = new long[length];
222 leaderEpochs = new Optional[length];
228 public void onPartitionsRevoked(Collection<TopicPartition> partitions)
230 log.info("Revoked partitions: {}", toString(partitions));
235 public void onPartitionsLost(Collection<TopicPartition> partitions)
237 log.info("Lost partitions: {}", toString(partitions));
238 producer.abortTransaction();
241 String toString(Collection<TopicPartition> partitions)
246 .map(tp -> tp.topic() + "-" + tp.partition())
247 .collect(Collectors.joining(", "));
254 log.info("Shutdown requested...");
258 log.info("Shutdown completed!");