+++ /dev/null
-package de.juplo.kafka.wordcount.recorder;
-
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.springframework.http.HttpStatus;
-import org.springframework.http.ResponseEntity;
-import org.springframework.util.MimeTypeUtils;
-import org.springframework.web.bind.annotation.PathVariable;
-import org.springframework.web.bind.annotation.PostMapping;
-import org.springframework.web.bind.annotation.RequestBody;
-import org.springframework.web.bind.annotation.RestController;
-import org.springframework.web.context.request.async.DeferredResult;
-
-import javax.validation.constraints.NotEmpty;
-
-
-@RestController
-public class RecorderController
-{
- private final String topic;
- private final KafkaProducer<String, String> producer;
-
-
- public RecorderController(RecorderApplicationProperties properties, KafkaProducer<String,String> producer)
- {
- this.topic = properties.getTopic();
- this.producer = producer;
- }
-
- @PostMapping(
- path = "/{username}",
- consumes = {
- MimeTypeUtils.TEXT_PLAIN_VALUE,
- MimeTypeUtils.APPLICATION_JSON_VALUE
- },
- produces = MimeTypeUtils.APPLICATION_JSON_VALUE)
- DeferredResult<ResponseEntity<RecordingResult>> speak(
- @PathVariable
- @NotEmpty(message = "A username must be provided")
- String username,
- @RequestBody
- @NotEmpty(message = "The spoken sentence must not be empty!")
- String sentence)
- {
- DeferredResult<ResponseEntity<RecordingResult>> result = new DeferredResult<>();
-
- ProducerRecord<String, String> record = new ProducerRecord<>(topic, username, sentence);
- producer.send(record, (metadata, exception) ->
- {
- if (metadata != null)
- {
- result.setResult(
- ResponseEntity.ok(RecordingResult.of(
- username,
- sentence,
- topic,
- metadata.partition(),
- metadata.offset(),
- null,
- null)));
- }
- else
- {
- result.setErrorResult(
- ResponseEntity
- .internalServerError()
- .body(RecordingResult.of(
- username,
- sentence,
- topic,
- null,
- null,
- HttpStatus.INTERNAL_SERVER_ERROR.value(),
- exception.toString())));
- }
- });
-
- return result;
- }
-}
--- /dev/null
+package de.juplo.kafka.wordcount.splitter;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.*;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.springframework.boot.ApplicationArguments;
+import org.springframework.boot.ApplicationRunner;
+import org.springframework.stereotype.Component;
+import org.springframework.util.Assert;
+
+import javax.annotation.PreDestroy;
+import java.time.Clock;
+import java.time.Duration;
+import java.util.*;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+
+@Component
+@Slf4j
+public class SplitterStreamProcessor implements ApplicationRunner
+{
+ final static Pattern PATTERN = Pattern.compile("\\W+");
+
+ private final String inputTopic;
+ private final String outputTopic;
+ private final KafkaConsumer<String, String> consumer;
+ private final KafkaProducer<String, String> producer;
+ private final Clock clock;
+ private final int commitInterval;
+ private final Lock running = new ReentrantLock();
+
+ private boolean stopped = false;
+ private long[] offsets;
+ private Optional<Integer>[] leaderEpochs;
+ private long lastCommit;
+
+ public SplitterStreamProcessor(
+ SplitterApplicationProperties properties,
+ Clock clock)
+ {
+ this.inputTopic = properties.getInputTopic();
+ this.outputTopic = properties.getOutputTopic();
+
+ this.clock = clock;
+ this.commitInterval = properties.getCommitInterval();
+
+ Assert.hasText(properties.getBootstrapServer(), "juplo.wordcount.splitter.bootstrap-server must be set");
+
+ Properties props;
+
+ props = new Properties();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, properties.getGroupId());
+ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ consumer = new KafkaConsumer<>(props);
+
+ props = new Properties();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
+ props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
+ props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-ID");
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ producer = new KafkaProducer<>(props);
+ }
+
+ @Override
+ public void run(ApplicationArguments args)
+ {
+ running.lock();
+
+ try
+ {
+ log.info("Initializing transaction");
+ producer.initTransactions();
+
+ log.info("Subscribing to topic {}", inputTopic);
+ consumer.subscribe(
+ Arrays.asList(inputTopic),
+ new TransactionalConsumerRebalanceListener());
+
+ while (!stopped)
+ {
+ ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
+
+ records.forEach(inputRecord ->
+ {
+ log.debug(
+ "Received a recording of {}, partition={}, offset={}, epoch={}",
+ inputRecord.key(),
+ inputRecord.partition(),
+ inputRecord.offset(),
+ inputRecord.leaderEpoch());
+
+ offsets[inputRecord.partition()] = inputRecord.offset();
+ leaderEpochs[inputRecord.partition()] = inputRecord.leaderEpoch();
+
+ String[] words = PATTERN.split(inputRecord.value());
+ for (int i = 0; i < words.length; i++)
+ {
+ ProducerRecord<String, String> outputRecord =
+ new ProducerRecord<>(
+ outputTopic,
+ inputRecord.key(),
+ words[i].trim());
+
+ producer.send(outputRecord, (metadata, exception) ->
+ {
+ if (exception == null)
+ {
+ // HANDLE SUCCESS
+ log.debug(
+ "Sent {}={}, partition={}, offset={}",
+ outputRecord.key(),
+ outputRecord.value(),
+ metadata.partition(),
+ metadata.offset());
+ }
+ else
+ {
+ // HANDLE ERROR
+ log.error(
+ "Could not send {}={}: {}",
+ outputRecord.key(),
+ outputRecord.value(),
+ exception.toString());
+ }
+ });
+ }
+
+ long delta = clock.millis() - lastCommit;
+ if (delta > commitInterval)
+ {
+ log.info("Elapsed time since last commit: {}ms", delta);
+ commitTransaction();
+ beginTransaction();
+ }
+ });
+ }
+ }
+ catch (WakeupException e)
+ {
+ log.info("Waking up from exception!");
+ // Nicht nötig, da consumer.close() onPartitionsRevoked() auslöst!
+ // commitTransaction();
+ }
+ catch (Exception e)
+ {
+ log.error("Unexpected exception!", e);
+ producer.abortTransaction();
+ }
+ finally
+ {
+ try
+ {
+ log.info("Closing consumer");
+ consumer.close();
+ log.info("Closing producer");
+ producer.close();
+ log.info("Exiting!");
+ }
+ finally
+ {
+ running.unlock();
+ }
+ }
+ }
+
+ private void beginTransaction()
+ {
+ log.info("Beginning new transaction");
+ lastCommit = clock.millis();
+ producer.beginTransaction();
+ }
+
+ private void commitTransaction()
+ {
+ Map<TopicPartition, OffsetAndMetadata> offsetsToSend = new HashMap<>();
+ for (int i = 0; i < offsets.length; i++)
+ {
+ if (offsets[i] > 0)
+ {
+ offsetsToSend.put(
+ new TopicPartition(inputTopic, i),
+ new OffsetAndMetadata(offsets[i], leaderEpochs[i], ""));
+ }
+ }
+ producer.sendOffsetsToTransaction(
+ offsetsToSend,
+ consumer.groupMetadata());
+ log.info("Committing transaction");
+ producer.commitTransaction();
+ }
+
+ class TransactionalConsumerRebalanceListener implements ConsumerRebalanceListener
+ {
+ @Override
+ public void onPartitionsAssigned(Collection<TopicPartition> partitions)
+ {
+ log.info("Assigned partitions: {}", toString(partitions));
+
+ // Compote the length of an array, that can be used to store the offsets
+ // (We can ignore the topic, since we only read from a single one!)
+ int length =
+ partitions
+ .stream()
+ .reduce(
+ 0,
+ (i, v) -> i < v.partition() ? v.partition() : i,
+ (l, r) -> l < r ? r : l) + 1;
+ offsets = new long[length];
+ leaderEpochs = new Optional[length];
+
+ beginTransaction();
+ }
+
+ @Override
+ public void onPartitionsRevoked(Collection<TopicPartition> partitions)
+ {
+ log.info("Revoked partitions: {}", toString(partitions));
+ commitTransaction();
+ }
+
+ @Override
+ public void onPartitionsLost(Collection<TopicPartition> partitions)
+ {
+ log.info("Lost partitions: {}", toString(partitions));
+ producer.abortTransaction();
+ }
+
+ String toString(Collection<TopicPartition> partitions)
+ {
+ return
+ partitions
+ .stream()
+ .map(tp -> tp.topic() + "-" + tp.partition())
+ .collect(Collectors.joining(", "));
+ }
+ }
+
+ @PreDestroy
+ public void stop()
+ {
+ log.info("Shutdown requested...");
+ stopped = true;
+ consumer.wakeup();
+ running.lock();
+ log.info("Shutdown completed!");
+ }
+}