<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>de.juplo.kafka.wordcount</groupId>
- <artifactId>recorder</artifactId>
- <version>1.0.1</version>
- <name>Wordcount-Recorder</name>
- <description>Recorder-service of the multi-user wordcount-example</description>
+ <artifactId>kafka-splitter</artifactId>
+ <version>1.0.0</version>
+ <name>Wordcount-Splitter</name>
+ <description>Stream-processor of the multi-user wordcount-example, that splits the sentences up into single words</description>
<properties>
<docker-maven-plugin.version>0.33.0</docker-maven-plugin.version>
<java.version>11</java.version>
+++ /dev/null
-package de.juplo.kafka.wordcount.recorder;
-
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.Bean;
-import org.springframework.util.Assert;
-
-import java.util.Properties;
-
-
-@SpringBootApplication
-@EnableConfigurationProperties(RecorderApplicationProperties.class)
-public class RecorderApplication
-{
- @Bean(destroyMethod = "close")
- KafkaProducer<String, String> producer(RecorderApplicationProperties properties)
- {
- Assert.hasText(properties.getBootstrapServer(), "juplo.wordcount.recorder.bootstrap-server must be set");
-
- Properties props = new Properties();
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
- props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-
- return new KafkaProducer<>(props);
- }
-
- public static void main(String[] args)
- {
- SpringApplication.run(RecorderApplication.class, args);
- }
-}
+++ /dev/null
-package de.juplo.kafka.wordcount.recorder;
-
-
-import lombok.Getter;
-import lombok.Setter;
-import lombok.ToString;
-import org.springframework.boot.context.properties.ConfigurationProperties;
-
-
-@ConfigurationProperties("juplo.wordcount.recorder")
-@Getter
-@Setter
-@ToString
-public class RecorderApplicationProperties
-{
- private String bootstrapServer = "localhost:9092";
- private String topic = "recordings";
-}
+++ /dev/null
-package de.juplo.kafka.wordcount.recorder;
-
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.springframework.http.HttpStatus;
-import org.springframework.http.ResponseEntity;
-import org.springframework.util.MimeTypeUtils;
-import org.springframework.web.bind.annotation.PathVariable;
-import org.springframework.web.bind.annotation.PostMapping;
-import org.springframework.web.bind.annotation.RequestBody;
-import org.springframework.web.bind.annotation.RestController;
-import org.springframework.web.context.request.async.DeferredResult;
-
-import javax.validation.constraints.NotEmpty;
-
-
-@RestController
-public class RecorderController
-{
- private final String topic;
- private final KafkaProducer<String, String> producer;
-
-
- public RecorderController(RecorderApplicationProperties properties, KafkaProducer<String,String> producer)
- {
- this.topic = properties.getTopic();
- this.producer = producer;
- }
-
- @PostMapping(
- path = "/{username}",
- consumes = {
- MimeTypeUtils.TEXT_PLAIN_VALUE,
- MimeTypeUtils.APPLICATION_JSON_VALUE
- },
- produces = MimeTypeUtils.APPLICATION_JSON_VALUE)
- DeferredResult<ResponseEntity<RecordingResult>> speak(
- @PathVariable
- @NotEmpty(message = "A username must be provided")
- String username,
- @RequestBody
- @NotEmpty(message = "The spoken sentence must not be empty!")
- String sentence)
- {
- DeferredResult<ResponseEntity<RecordingResult>> result = new DeferredResult<>();
-
- ProducerRecord<String, String> record = new ProducerRecord<>(topic, username, sentence);
- producer.send(record, (metadata, exception) ->
- {
- if (metadata != null)
- {
- result.setResult(
- ResponseEntity.ok(RecordingResult.of(
- username,
- sentence,
- topic,
- metadata.partition(),
- metadata.offset(),
- null,
- null)));
- }
- else
- {
- result.setErrorResult(
- ResponseEntity
- .internalServerError()
- .body(RecordingResult.of(
- username,
- sentence,
- topic,
- null,
- null,
- HttpStatus.INTERNAL_SERVER_ERROR.value(),
- exception.toString())));
- }
- });
-
- return result;
- }
-}
+++ /dev/null
-package de.juplo.kafka.wordcount.recorder;
-
-import com.fasterxml.jackson.annotation.JsonInclude;
-import lombok.Value;
-
-import static com.fasterxml.jackson.annotation.JsonInclude.Include.NON_NULL;
-
-
-@Value(staticConstructor = "of")
-public class RecordingResult
-{
- @JsonInclude(NON_NULL) private final String username;
- @JsonInclude(NON_NULL) private final String sentence;
- @JsonInclude(NON_NULL) private final String topic;
- @JsonInclude(NON_NULL) private final Integer partition;
- @JsonInclude(NON_NULL) private final Long offset;
- @JsonInclude(NON_NULL) private final Integer status;
- @JsonInclude(NON_NULL) private final String error;
-}
--- /dev/null
+package de.juplo.kafka.wordcount.recorder;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.util.Assert;
+
+import java.time.Clock;
+import java.util.Properties;
+
+
+@SpringBootApplication
+@EnableConfigurationProperties(SplitterApplicationProperties.class)
+public class SplitterApplication
+{
+ @Bean(destroyMethod = "close")
+ KafkaConsumer<String, String> consumer(SplitterApplicationProperties properties)
+ {
+ Assert.hasText(properties.getBootstrapServer(), "juplo.wordcount.splitter.bootstrap-server must be set");
+
+ Properties props = new Properties();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, properties.getGroupId());
+ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringSerializer.class);
+
+ return new KafkaConsumer<>(props);
+ }
+
+ @Bean(destroyMethod = "close")
+ KafkaProducer<String, String> producer(SplitterApplicationProperties properties)
+ {
+ Assert.hasText(properties.getBootstrapServer(), "juplo.wordcount.splitter.bootstrap-server must be set");
+
+ Properties props = new Properties();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
+ props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
+ props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-ID");
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+
+ return new KafkaProducer<>(props);
+ }
+
+ @Bean
+ Clock clock()
+ {
+ return Clock.systemDefaultZone();
+ }
+
+
+ public static void main(String[] args)
+ {
+ SpringApplication.run(SplitterApplication.class, args);
+ }
+}
--- /dev/null
+package de.juplo.kafka.wordcount.recorder;
+
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+
+@ConfigurationProperties("juplo.wordcount.splitter")
+@Getter
+@Setter
+@ToString
+public class SplitterApplicationProperties
+{
+ private String bootstrapServer = "localhost:9092";
+ private String groupId = "splitter";
+ private String inputTopic = "recordings";
+ private String outputTopic = "words";
+ private int commitInterval = 1000;
+}
--- /dev/null
+package de.juplo.kafka.wordcount.recorder;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
+import org.springframework.boot.ApplicationArguments;
+import org.springframework.boot.ApplicationRunner;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.ResponseEntity;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PreDestroy;
+import java.time.Clock;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+
+@Component
+@Slf4j
+public class SplitterStreamProcessor implements ApplicationRunner
+{
+ final static Pattern PATTERN = Pattern.compile("\\W+");
+
+ private final String inputTopic;
+ private final String outputTopic;
+ private final KafkaConsumer<String, String> consumer;
+ private final KafkaProducer<String, String> producer;
+ private final Clock clock;
+ private final int commitInterval;
+
+ private boolean stopped = false;
+ private long lastCommit;
+
+ public SplitterStreamProcessor(
+ SplitterApplicationProperties properties,
+ KafkaConsumer<String, String> consumer,
+ KafkaProducer<String,String> producer,
+ Clock clock)
+ {
+ this.inputTopic = properties.getInputTopic();
+ this.outputTopic = properties.getOutputTopic();
+
+ this.consumer = consumer;
+ this.producer = producer;
+
+ this.clock = clock;
+ this.commitInterval = properties.getCommitInterval();
+ }
+
+ @Override
+ public void run(ApplicationArguments args)
+ {
+ log.info("Initializing transaction");
+ producer.initTransactions();
+
+ try
+ {
+ log.info("C - Subscribing to topic test");
+ consumer.subscribe(
+ Arrays.asList(inputTopic),
+ new TransactionalConsumerRebalanceListener());
+
+ while (!stopped)
+ {
+ ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
+
+ records.forEach(inputRecord ->
+ {
+ String[] words = PATTERN.split(inputRecord.value());
+ for (int i = 0; i < words.length; i++)
+ {
+ ProducerRecord<String, String> outputRecord =
+ new ProducerRecord<>(
+ outputTopic,
+ inputRecord.key(),
+ words[i].trim());
+
+ producer.send(outputRecord, (metadata, exception) ->
+ {
+ if (exception == null)
+ {
+ // HANDLE SUCCESS
+ log.info(
+ "sent {}={}, partition={}, offset={}",
+ outputRecord.key(),
+ outputRecord.value(),
+ metadata.partition(),
+ metadata.offset());
+ }
+ else
+ {
+ // HANDLE ERROR
+ log.error(
+ "could not send {}={}: {}",
+ outputRecord.key(),
+ outputRecord.value(),
+ exception.toString());
+ }
+ });
+ }
+ });
+
+ long delta = lastCommit - clock.millis();
+ if (delta > commitInterval)
+ {
+ log.info("Elapsed time since last commit: {}ms", delta);
+ commitTransaction();
+ beginTransaction();
+ }
+ }
+ }
+ catch (WakeupException e)
+ {
+ log.info("Waking up from exception!", e);
+ commitTransaction();
+ }
+ catch (Exception e)
+ {
+ log.error("Unexpected exception!", e);
+ producer.abortTransaction();
+ }
+ finally
+ {
+ log.info("Closing consumer");
+ consumer.close();
+ log.info("C - DONE!");
+ }
+ }
+
+ private void beginTransaction()
+ {
+ log.info("Beginning new transaction");
+ lastCommit = clock.millis();
+ producer.beginTransaction();
+ }
+
+ private void commitTransaction()
+ {
+ log.info("Committing transaction");
+ producer.commitTransaction();
+ }
+
+ class TransactionalConsumerRebalanceListener implements ConsumerRebalanceListener
+ {
+ @Override
+ public void onPartitionsAssigned(Collection<TopicPartition> partitions)
+ {
+ log.info(
+ "Assigned partitions: {}",
+ partitions
+ .stream()
+ .map(tp -> tp.topic() + "-" + tp.partition())
+ .collect(Collectors.joining(", ")));
+
+ commitTransaction();
+ }
+
+ @Override
+ public void onPartitionsRevoked(Collection<TopicPartition> partitions)
+ {
+ log.info(
+ "Revoked partitions: {}",
+ partitions
+ .stream()
+ .map(tp -> tp.topic() + "-" + tp.partition())
+ .collect(Collectors.joining(", ")));
+
+ commitTransaction();
+ }
+
+ @Override
+ public void onPartitionsLost(Collection<TopicPartition> partitions)
+ {
+ log.info(
+ "Lost partitions: {}",
+ partitions
+ .stream()
+ .map(tp -> tp.topic() + "-" + tp.partition())
+ .collect(Collectors.joining(", ")));
+
+ producer.abortTransaction();
+ }
+ }
+
+ @PreDestroy
+ public void stop()
+ {
+ log.info("Stopping Consumer");
+ stopped = true;
+ consumer.wakeup();
+ }
+}