1 package de.juplo.kafka.wordcount.splitter;
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.consumer.*;
5 import org.apache.kafka.clients.producer.KafkaProducer;
6 import org.apache.kafka.clients.producer.ProducerConfig;
7 import org.apache.kafka.clients.producer.ProducerRecord;
8 import org.apache.kafka.common.TopicPartition;
9 import org.apache.kafka.common.errors.WakeupException;
10 import org.apache.kafka.common.serialization.StringDeserializer;
11 import org.apache.kafka.common.serialization.StringSerializer;
12 import org.springframework.core.task.TaskExecutor;
13 import org.springframework.stereotype.Component;
14 import org.springframework.util.Assert;
16 import javax.annotation.PreDestroy;
17 import java.time.Clock;
18 import java.time.Duration;
20 import java.util.concurrent.locks.Lock;
21 import java.util.concurrent.locks.ReentrantLock;
22 import java.util.regex.Pattern;
23 import java.util.stream.Collectors;
28 public class SplitterStreamProcessor implements Runnable
30 final static Pattern PATTERN = Pattern.compile("\\W+");
32 private final String inputTopic;
33 private final String outputTopic;
34 private final KafkaConsumer<String, String> consumer;
35 private final KafkaProducer<String, String> producer;
36 private final Clock clock;
37 private final int commitInterval;
38 private final Lock running = new ReentrantLock();
40 private boolean stopped = false;
41 private long[] offsets;
42 private Optional<Integer>[] leaderEpochs;
43 private long lastCommit;
45 public SplitterStreamProcessor(
46 SplitterApplicationProperties properties,
48 TaskExecutor executor)
50 this.inputTopic = properties.getInputTopic();
51 this.outputTopic = properties.getOutputTopic();
54 this.commitInterval = properties.getCommitInterval();
56 Assert.hasText(properties.getBootstrapServer(), "juplo.wordcount.splitter.bootstrap-server must be set");
60 props = new Properties();
61 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
62 props.put(ConsumerConfig.GROUP_ID_CONFIG, properties.getGroupId());
63 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
64 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
65 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
66 consumer = new KafkaConsumer<>(props);
68 props = new Properties();
69 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
70 props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
71 props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-ID");
72 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
73 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
74 producer = new KafkaProducer<>(props);
76 executor.execute(this);
85 log.info("Initializing transaction");
86 producer.initTransactions();
88 log.info("Subscribing to topic {}", inputTopic);
90 Arrays.asList(inputTopic),
91 new TransactionalConsumerRebalanceListener());
95 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
97 records.forEach(inputRecord ->
100 "Received a recording of {}, partition={}, offset={}, epoch={}",
102 inputRecord.partition(),
103 inputRecord.offset(),
104 inputRecord.leaderEpoch());
106 offsets[inputRecord.partition()] = inputRecord.offset();
107 leaderEpochs[inputRecord.partition()] = inputRecord.leaderEpoch();
109 String[] words = PATTERN.split(inputRecord.value());
110 for (int i = 0; i < words.length; i++)
112 ProducerRecord<String, String> outputRecord =
113 new ProducerRecord<>(
118 producer.send(outputRecord, (metadata, exception) ->
120 if (exception == null)
124 "Sent {}={}, partition={}, offset={}",
126 outputRecord.value(),
127 metadata.partition(),
134 "Could not send {}={}: {}",
136 outputRecord.value(),
137 exception.toString());
142 long delta = clock.millis() - lastCommit;
143 if (delta > commitInterval)
145 log.info("Elapsed time since last commit: {}ms", delta);
152 catch (WakeupException e)
154 log.info("Waking up from exception!");
155 // Nicht nötig, da consumer.close() onPartitionsRevoked() auslöst!
156 // commitTransaction();
160 log.error("Unexpected exception!", e);
161 producer.abortTransaction();
167 log.info("Closing consumer");
169 log.info("Closing producer");
171 log.info("Exiting!");
180 private void beginTransaction()
182 log.info("Beginning new transaction");
183 lastCommit = clock.millis();
184 producer.beginTransaction();
187 private void commitTransaction()
189 Map<TopicPartition, OffsetAndMetadata> offsetsToSend = new HashMap<>();
190 for (int i = 0; i < offsets.length; i++)
195 new TopicPartition(inputTopic, i),
196 new OffsetAndMetadata(offsets[i], leaderEpochs[i], ""));
199 producer.sendOffsetsToTransaction(
201 consumer.groupMetadata());
202 log.info("Committing transaction");
203 producer.commitTransaction();
206 class TransactionalConsumerRebalanceListener implements ConsumerRebalanceListener
209 public void onPartitionsAssigned(Collection<TopicPartition> partitions)
211 log.info("Assigned partitions: {}", toString(partitions));
213 // Compote the length of an array, that can be used to store the offsets
214 // (We can ignore the topic, since we only read from a single one!)
220 (i, v) -> i < v.partition() ? v.partition() : i,
221 (l, r) -> l < r ? r : l) + 1;
222 offsets = new long[length];
223 leaderEpochs = new Optional[length];
229 public void onPartitionsRevoked(Collection<TopicPartition> partitions)
231 log.info("Revoked partitions: {}", toString(partitions));
236 public void onPartitionsLost(Collection<TopicPartition> partitions)
238 log.info("Lost partitions: {}", toString(partitions));
239 producer.abortTransaction();
242 String toString(Collection<TopicPartition> partitions)
247 .map(tp -> tp.topic() + "-" + tp.partition())
248 .collect(Collectors.joining(", "));
255 log.info("Shutdown requested...");
259 log.info("Shutdown completed!");