1 package de.juplo.kafka.wordcount.splitter;
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.consumer.*;
5 import org.apache.kafka.clients.producer.KafkaProducer;
6 import org.apache.kafka.clients.producer.ProducerConfig;
7 import org.apache.kafka.clients.producer.ProducerRecord;
8 import org.apache.kafka.common.TopicPartition;
9 import org.apache.kafka.common.errors.WakeupException;
10 import org.apache.kafka.common.serialization.StringDeserializer;
11 import org.apache.kafka.common.serialization.StringSerializer;
12 import org.springframework.core.task.TaskExecutor;
13 import org.springframework.stereotype.Component;
14 import org.springframework.util.Assert;
16 import javax.annotation.PreDestroy;
17 import java.time.Clock;
18 import java.time.Duration;
20 import java.util.concurrent.locks.Lock;
21 import java.util.concurrent.locks.ReentrantLock;
22 import java.util.stream.Collectors;
27 public class SplitterStreamProcessor implements Runnable
29 private final MessageSplitter splitter;
30 private final String inputTopic;
31 private final String outputTopic;
32 private final KafkaConsumer<String, String> consumer;
33 private final KafkaProducer<String, String> producer;
34 private final Clock clock;
35 private final int commitInterval;
36 private final Lock running = new ReentrantLock();
38 private boolean stopped = false;
39 private long[] offsets;
40 private Optional<Integer>[] leaderEpochs;
41 private long lastCommit;
43 public SplitterStreamProcessor(
44 MessageSplitter splitter,
45 SplitterApplicationProperties properties,
47 TaskExecutor executor)
49 this.splitter = splitter;
51 this.inputTopic = properties.getInputTopic();
52 this.outputTopic = properties.getOutputTopic();
55 this.commitInterval = properties.getCommitInterval();
57 Assert.hasText(properties.getBootstrapServer(), "juplo.wordcount.splitter.bootstrap-server must be set");
61 props = new Properties();
62 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
63 props.put(ConsumerConfig.GROUP_ID_CONFIG, properties.getGroupId());
64 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
65 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
66 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
67 consumer = new KafkaConsumer<>(props);
69 props = new Properties();
70 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
71 props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
72 props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-ID");
73 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
74 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
75 producer = new KafkaProducer<>(props);
77 executor.execute(this);
86 log.info("Initializing transaction");
87 producer.initTransactions();
89 log.info("Subscribing to topic {}", inputTopic);
91 Arrays.asList(inputTopic),
92 new TransactionalConsumerRebalanceListener());
96 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
98 records.forEach(inputRecord ->
101 "Received a recording of {}, partition={}, offset={}, epoch={}",
103 inputRecord.partition(),
104 inputRecord.offset(),
105 inputRecord.leaderEpoch());
107 offsets[inputRecord.partition()] = inputRecord.offset();
108 leaderEpochs[inputRecord.partition()] = inputRecord.leaderEpoch();
110 String[] words = splitter.split(inputRecord.value());
111 for (int i = 0; i < words.length; i++)
113 ProducerRecord<String, String> outputRecord =
114 new ProducerRecord<>(
119 producer.send(outputRecord, (metadata, exception) ->
121 if (exception == null)
125 "Sent {}={}, partition={}, offset={}",
127 outputRecord.value(),
128 metadata.partition(),
135 "Could not send {}={}: {}",
137 outputRecord.value(),
138 exception.toString());
143 long delta = clock.millis() - lastCommit;
144 if (delta > commitInterval)
146 log.info("Elapsed time since last commit: {}ms", delta);
153 catch (WakeupException e)
155 log.info("Waking up from exception!");
156 // Nicht nötig, da consumer.close() onPartitionsRevoked() auslöst!
157 // commitTransaction();
161 log.error("Unexpected exception!", e);
162 producer.abortTransaction();
168 log.info("Closing consumer");
170 log.info("Closing producer");
172 log.info("Exiting!");
181 private void beginTransaction()
183 log.info("Beginning new transaction");
184 lastCommit = clock.millis();
185 producer.beginTransaction();
188 private void commitTransaction()
190 Map<TopicPartition, OffsetAndMetadata> offsetsToSend = new HashMap<>();
191 for (int i = 0; i < offsets.length; i++)
196 new TopicPartition(inputTopic, i),
197 new OffsetAndMetadata(offsets[i], leaderEpochs[i], ""));
200 producer.sendOffsetsToTransaction(
202 consumer.groupMetadata());
203 log.info("Committing transaction");
204 producer.commitTransaction();
207 class TransactionalConsumerRebalanceListener implements ConsumerRebalanceListener
210 public void onPartitionsAssigned(Collection<TopicPartition> partitions)
212 log.info("Assigned partitions: {}", toString(partitions));
214 // Compote the length of an array, that can be used to store the offsets
215 // (We can ignore the topic, since we only read from a single one!)
221 (i, v) -> i < v.partition() ? v.partition() : i,
222 (l, r) -> l < r ? r : l) + 1;
223 offsets = new long[length];
224 leaderEpochs = new Optional[length];
230 public void onPartitionsRevoked(Collection<TopicPartition> partitions)
232 log.info("Revoked partitions: {}", toString(partitions));
237 public void onPartitionsLost(Collection<TopicPartition> partitions)
239 log.info("Lost partitions: {}", toString(partitions));
240 producer.abortTransaction();
243 String toString(Collection<TopicPartition> partitions)
248 .map(tp -> tp.topic() + "-" + tp.partition())
249 .collect(Collectors.joining(", "));
256 log.info("Shutdown requested...");
259 log.warn("Ignoring request: already stopped!");
265 log.info("Shutdown completed!");