splitter: 1.0.0-vanilla-kafka - splits up the recorded sentences into words
authorKai Moritz <kai@juplo.de>
Fri, 27 May 2022 12:38:43 +0000 (14:38 +0200)
committerKai Moritz <kai@juplo.de>
Thu, 30 Jun 2022 16:50:26 +0000 (18:50 +0200)
* Simple implementation of the splitter without Kafka Streams
* This version does not record its position
* Hence, in case of an error, it will most likly reprocess some messages,
  leading to wrong countings

12 files changed:
Dockerfile
pom.xml
src/main/java/de/juplo/kafka/wordcount/recorder/RecorderApplication.java [deleted file]
src/main/java/de/juplo/kafka/wordcount/recorder/RecorderApplicationProperties.java [deleted file]
src/main/java/de/juplo/kafka/wordcount/recorder/RecorderController.java [deleted file]
src/main/java/de/juplo/kafka/wordcount/recorder/RecordingResult.java [deleted file]
src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationProperties.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java [new file with mode: 0644]
src/main/resources/application.properties
src/test/java/de/juplo/kafka/wordcount/recorder/ApplicationTests.java [deleted file]
src/test/java/de/juplo/kafka/wordcount/splitter/ApplicationTests.java [new file with mode: 0644]

index 9c0b843..803477f 100644 (file)
@@ -1,5 +1,5 @@
 FROM openjdk:11-jre-slim
 COPY target/*.jar /opt/app.jar
-EXPOSE 8081
+EXPOSE 8086
 ENTRYPOINT ["java", "-jar", "/opt/app.jar"]
 CMD []
diff --git a/pom.xml b/pom.xml
index edcedf5..fee0245 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -9,10 +9,10 @@
                <relativePath/> <!-- lookup parent from repository -->
        </parent>
        <groupId>de.juplo.kafka.wordcount</groupId>
-       <artifactId>recorder</artifactId>
-       <version>1.0.1</version>
-       <name>Wordcount-Recorder</name>
-       <description>Recorder-service of the multi-user wordcount-example</description>
+       <artifactId>splitter</artifactId>
+       <version>1.0.0-vanilla-kafka</version>
+       <name>Wordcount-Splitter</name>
+       <description>A version of the stream-processor for the multi-user wordcount-example, that splits the sentences up into single words, that is implemented in Vanilla-Kafka (without Kafka Streams)</description>
        <properties>
                <docker-maven-plugin.version>0.33.0</docker-maven-plugin.version>
                <java.version>11</java.version>
diff --git a/src/main/java/de/juplo/kafka/wordcount/recorder/RecorderApplication.java b/src/main/java/de/juplo/kafka/wordcount/recorder/RecorderApplication.java
deleted file mode 100644 (file)
index abe0685..0000000
+++ /dev/null
@@ -1,36 +0,0 @@
-package de.juplo.kafka.wordcount.recorder;
-
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.Bean;
-import org.springframework.util.Assert;
-
-import java.util.Properties;
-
-
-@SpringBootApplication
-@EnableConfigurationProperties(RecorderApplicationProperties.class)
-public class RecorderApplication
-{
-       @Bean(destroyMethod = "close")
-       KafkaProducer<String, String> producer(RecorderApplicationProperties properties)
-       {
-               Assert.hasText(properties.getBootstrapServer(), "juplo.wordcount.recorder.bootstrap-server must be set");
-
-               Properties props = new Properties();
-               props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
-               props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-               props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-
-               return new KafkaProducer<>(props);
-       }
-
-       public static void main(String[] args)
-       {
-               SpringApplication.run(RecorderApplication.class, args);
-       }
-}
diff --git a/src/main/java/de/juplo/kafka/wordcount/recorder/RecorderApplicationProperties.java b/src/main/java/de/juplo/kafka/wordcount/recorder/RecorderApplicationProperties.java
deleted file mode 100644 (file)
index 552ebaf..0000000
+++ /dev/null
@@ -1,18 +0,0 @@
-package de.juplo.kafka.wordcount.recorder;
-
-
-import lombok.Getter;
-import lombok.Setter;
-import lombok.ToString;
-import org.springframework.boot.context.properties.ConfigurationProperties;
-
-
-@ConfigurationProperties("juplo.wordcount.recorder")
-@Getter
-@Setter
-@ToString
-public class RecorderApplicationProperties
-{
-  private String bootstrapServer = "localhost:9092";
-  private String topic = "recordings";
-}
diff --git a/src/main/java/de/juplo/kafka/wordcount/recorder/RecorderController.java b/src/main/java/de/juplo/kafka/wordcount/recorder/RecorderController.java
deleted file mode 100644 (file)
index 5fe69ad..0000000
+++ /dev/null
@@ -1,80 +0,0 @@
-package de.juplo.kafka.wordcount.recorder;
-
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.springframework.http.HttpStatus;
-import org.springframework.http.ResponseEntity;
-import org.springframework.util.MimeTypeUtils;
-import org.springframework.web.bind.annotation.PathVariable;
-import org.springframework.web.bind.annotation.PostMapping;
-import org.springframework.web.bind.annotation.RequestBody;
-import org.springframework.web.bind.annotation.RestController;
-import org.springframework.web.context.request.async.DeferredResult;
-
-import javax.validation.constraints.NotEmpty;
-
-
-@RestController
-public class RecorderController
-{
-  private final String topic;
-  private final KafkaProducer<String, String> producer;
-
-
-  public RecorderController(RecorderApplicationProperties properties, KafkaProducer<String,String> producer)
-  {
-    this.topic = properties.getTopic();
-    this.producer = producer;
-  }
-
-  @PostMapping(
-      path = "/{username}",
-      consumes = {
-          MimeTypeUtils.TEXT_PLAIN_VALUE,
-          MimeTypeUtils.APPLICATION_JSON_VALUE
-      },
-      produces = MimeTypeUtils.APPLICATION_JSON_VALUE)
-  DeferredResult<ResponseEntity<RecordingResult>> speak(
-      @PathVariable
-      @NotEmpty(message = "A username must be provided")
-      String username,
-      @RequestBody
-      @NotEmpty(message = "The spoken sentence must not be empty!")
-      String sentence)
-  {
-    DeferredResult<ResponseEntity<RecordingResult>> result = new DeferredResult<>();
-
-    ProducerRecord<String, String> record = new ProducerRecord<>(topic, username, sentence);
-    producer.send(record, (metadata, exception) ->
-    {
-      if (metadata != null)
-      {
-        result.setResult(
-            ResponseEntity.ok(RecordingResult.of(
-                username,
-                sentence,
-                topic,
-                metadata.partition(),
-                metadata.offset(),
-                null,
-                null)));
-      }
-      else
-      {
-        result.setErrorResult(
-            ResponseEntity
-                .internalServerError()
-                .body(RecordingResult.of(
-                    username,
-                    sentence,
-                    topic,
-                    null,
-                    null,
-                    HttpStatus.INTERNAL_SERVER_ERROR.value(),
-                    exception.toString())));
-      }
-    });
-
-    return result;
-  }
-}
diff --git a/src/main/java/de/juplo/kafka/wordcount/recorder/RecordingResult.java b/src/main/java/de/juplo/kafka/wordcount/recorder/RecordingResult.java
deleted file mode 100644 (file)
index 939b1d4..0000000
+++ /dev/null
@@ -1,19 +0,0 @@
-package de.juplo.kafka.wordcount.recorder;
-
-import com.fasterxml.jackson.annotation.JsonInclude;
-import lombok.Value;
-
-import static com.fasterxml.jackson.annotation.JsonInclude.Include.NON_NULL;
-
-
-@Value(staticConstructor = "of")
-public class RecordingResult
-{
-  @JsonInclude(NON_NULL) private final String username;
-  @JsonInclude(NON_NULL) private final String sentence;
-  @JsonInclude(NON_NULL) private final String topic;
-  @JsonInclude(NON_NULL) private final Integer partition;
-  @JsonInclude(NON_NULL) private final Long offset;
-  @JsonInclude(NON_NULL) private final Integer status;
-  @JsonInclude(NON_NULL) private final String error;
-}
diff --git a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java
new file mode 100644 (file)
index 0000000..d46a7cd
--- /dev/null
@@ -0,0 +1,26 @@
+package de.juplo.kafka.wordcount.splitter;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+
+import java.time.Clock;
+
+
+@SpringBootApplication
+@EnableConfigurationProperties(SplitterApplicationProperties.class)
+public class SplitterApplication
+{
+       @Bean
+       Clock clock()
+       {
+               return Clock.systemDefaultZone();
+       }
+
+
+       public static void main(String[] args)
+       {
+               SpringApplication.run(SplitterApplication.class, args);
+       }
+}
diff --git a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationProperties.java b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationProperties.java
new file mode 100644 (file)
index 0000000..f699f32
--- /dev/null
@@ -0,0 +1,21 @@
+package de.juplo.kafka.wordcount.splitter;
+
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+
+@ConfigurationProperties("juplo.wordcount.splitter")
+@Getter
+@Setter
+@ToString
+public class SplitterApplicationProperties
+{
+  private String bootstrapServer = "localhost:9092";
+  private String groupId = "splitter";
+  private String inputTopic = "recordings";
+  private String outputTopic = "words";
+  private int commitInterval = 1000;
+}
diff --git a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java
new file mode 100644 (file)
index 0000000..791e164
--- /dev/null
@@ -0,0 +1,260 @@
+package de.juplo.kafka.wordcount.splitter;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.*;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.springframework.boot.ApplicationArguments;
+import org.springframework.boot.ApplicationRunner;
+import org.springframework.stereotype.Component;
+import org.springframework.util.Assert;
+
+import javax.annotation.PreDestroy;
+import java.time.Clock;
+import java.time.Duration;
+import java.util.*;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+
+@Component
+@Slf4j
+public class SplitterStreamProcessor implements ApplicationRunner
+{
+  final static Pattern PATTERN = Pattern.compile("\\W+");
+
+  private final String inputTopic;
+  private final String outputTopic;
+  private final KafkaConsumer<String, String> consumer;
+  private final KafkaProducer<String, String> producer;
+  private final Clock clock;
+  private final int commitInterval;
+  private final Lock running = new ReentrantLock();
+
+  private boolean stopped = false;
+  private long[] offsets;
+  private Optional<Integer>[] leaderEpochs;
+  private long lastCommit;
+
+  public SplitterStreamProcessor(
+      SplitterApplicationProperties properties,
+      Clock clock)
+  {
+    this.inputTopic = properties.getInputTopic();
+    this.outputTopic = properties.getOutputTopic();
+
+    this.clock = clock;
+    this.commitInterval = properties.getCommitInterval();
+
+    Assert.hasText(properties.getBootstrapServer(), "juplo.wordcount.splitter.bootstrap-server must be set");
+
+    Properties props;
+
+    props = new Properties();
+    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
+    props.put(ConsumerConfig.GROUP_ID_CONFIG, properties.getGroupId());
+    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+    consumer = new KafkaConsumer<>(props);
+
+    props = new Properties();
+    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
+    props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
+    props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-ID");
+    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+    producer = new KafkaProducer<>(props);
+  }
+
+  @Override
+  public void run(ApplicationArguments args)
+  {
+    running.lock();
+
+    try
+    {
+      log.info("Initializing transaction");
+      producer.initTransactions();
+
+      log.info("Subscribing to topic {}", inputTopic);
+      consumer.subscribe(
+          Arrays.asList(inputTopic),
+          new TransactionalConsumerRebalanceListener());
+
+      while (!stopped)
+      {
+        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
+
+        records.forEach(inputRecord ->
+        {
+          log.debug(
+              "Received a recording of {}, partition={}, offset={}, epoch={}",
+              inputRecord.key(),
+              inputRecord.partition(),
+              inputRecord.offset(),
+              inputRecord.leaderEpoch());
+
+          offsets[inputRecord.partition()] = inputRecord.offset();
+          leaderEpochs[inputRecord.partition()] = inputRecord.leaderEpoch();
+
+          String[] words = PATTERN.split(inputRecord.value());
+          for (int i = 0; i < words.length; i++)
+          {
+            ProducerRecord<String, String> outputRecord =
+                new ProducerRecord<>(
+                    outputTopic,
+                    inputRecord.key(),
+                    words[i].trim());
+
+            producer.send(outputRecord, (metadata, exception) ->
+            {
+              if (exception == null)
+              {
+                // HANDLE SUCCESS
+                log.debug(
+                    "Sent {}={}, partition={}, offset={}",
+                    outputRecord.key(),
+                    outputRecord.value(),
+                    metadata.partition(),
+                    metadata.offset());
+              }
+              else
+              {
+                // HANDLE ERROR
+                log.error(
+                    "Could not send {}={}: {}",
+                    outputRecord.key(),
+                    outputRecord.value(),
+                    exception.toString());
+              }
+            });
+          }
+
+          long delta = clock.millis() - lastCommit;
+          if (delta > commitInterval)
+          {
+            log.info("Elapsed time since last commit: {}ms", delta);
+            commitTransaction();
+            beginTransaction();
+          }
+        });
+      }
+    }
+    catch (WakeupException e)
+    {
+      log.info("Waking up from exception!");
+      // Nicht nötig, da consumer.close() onPartitionsRevoked() auslöst!
+      // commitTransaction();
+    }
+    catch (Exception e)
+    {
+      log.error("Unexpected exception!", e);
+      producer.abortTransaction();
+    }
+    finally
+    {
+      try
+      {
+        log.info("Closing consumer");
+        consumer.close();
+        log.info("Closing producer");
+        producer.close();
+        log.info("Exiting!");
+      }
+      finally
+      {
+        running.unlock();
+      }
+    }
+  }
+
+  private void beginTransaction()
+  {
+    log.info("Beginning new transaction");
+    lastCommit = clock.millis();
+    producer.beginTransaction();
+  }
+
+  private void commitTransaction()
+  {
+    Map<TopicPartition, OffsetAndMetadata> offsetsToSend = new HashMap<>();
+    for (int i = 0; i < offsets.length; i++)
+    {
+      if (offsets[i] > 0)
+      {
+        offsetsToSend.put(
+            new TopicPartition(inputTopic, i),
+            new OffsetAndMetadata(offsets[i], leaderEpochs[i], ""));
+      }
+    }
+    producer.sendOffsetsToTransaction(
+        offsetsToSend,
+        consumer.groupMetadata());
+    log.info("Committing transaction");
+    producer.commitTransaction();
+  }
+
+  class TransactionalConsumerRebalanceListener implements ConsumerRebalanceListener
+  {
+    @Override
+    public void onPartitionsAssigned(Collection<TopicPartition> partitions)
+    {
+      log.info("Assigned partitions: {}", toString(partitions));
+
+      // Compote the length of an array, that can be used to store the offsets
+      // (We can ignore the topic, since we only read from a single one!)
+      int length =
+          partitions
+              .stream()
+              .reduce(
+                  0,
+                  (i, v) -> i < v.partition() ? v.partition() : i,
+                  (l, r) -> l < r ? r : l) + 1;
+      offsets = new long[length];
+      leaderEpochs = new Optional[length];
+
+      beginTransaction();
+    }
+
+    @Override
+    public void onPartitionsRevoked(Collection<TopicPartition> partitions)
+    {
+      log.info("Revoked partitions: {}", toString(partitions));
+      commitTransaction();
+    }
+
+    @Override
+    public void onPartitionsLost(Collection<TopicPartition> partitions)
+    {
+      log.info("Lost partitions: {}", toString(partitions));
+      producer.abortTransaction();
+    }
+
+    String toString(Collection<TopicPartition> partitions)
+    {
+      return
+          partitions
+              .stream()
+              .map(tp -> tp.topic() + "-" + tp.partition())
+              .collect(Collectors.joining(", "));
+    }
+  }
+
+  @PreDestroy
+  public void stop()
+  {
+    log.info("Shutdown requested...");
+    stopped = true;
+    consumer.wakeup();
+    running.lock();
+    log.info("Shutdown completed!");
+  }
+}
index 28d43c9..0d2624f 100644 (file)
@@ -1,2 +1,3 @@
-server.port=8081
+server.port=8086
 management.endpoints.web.exposure.include=*
+logging.level.de.juplo.kafka.wordcount.splitter=DEBUG
diff --git a/src/test/java/de/juplo/kafka/wordcount/recorder/ApplicationTests.java b/src/test/java/de/juplo/kafka/wordcount/recorder/ApplicationTests.java
deleted file mode 100644 (file)
index 885a408..0000000
+++ /dev/null
@@ -1,13 +0,0 @@
-package de.juplo.kafka.wordcount.recorder;
-
-import org.junit.jupiter.api.Test;
-import org.springframework.boot.test.context.SpringBootTest;
-
-@SpringBootTest
-class ApplicationTests
-{
-       @Test
-       void contextLoads()
-       {
-       }
-}
diff --git a/src/test/java/de/juplo/kafka/wordcount/splitter/ApplicationTests.java b/src/test/java/de/juplo/kafka/wordcount/splitter/ApplicationTests.java
new file mode 100644 (file)
index 0000000..77266a9
--- /dev/null
@@ -0,0 +1,13 @@
+package de.juplo.kafka.wordcount.splitter;
+
+import org.junit.jupiter.api.Test;
+import org.springframework.boot.test.context.SpringBootTest;
+
+@SpringBootTest
+class ApplicationTests
+{
+       @Test
+       void contextLoads()
+       {
+       }
+}